关于后端:延时任务之zookeeper

38次阅读

共计 17662 个字符,预计需要花费 45 分钟才能阅读完成。

前言
实现延时工作的关键点,是要存储工作的形容和工作的执行工夫,还要能依据工作执行工夫进行排序,那么咱们可不可以应用 zookeeper 来实现延时工作呢?答案当然是必定的。要晓得,zookeeper 的 znode 同样能够用来存储数据,那么咱们就能够利用这一点来实现延时工作。实际上,驰名的 zookeeper 客户端 curator 就提供了基于 zookeeper 的延时工作 API,明天就从源码的角度带大家理解下 curator 是如何应用 zookeeper 实现延时工作的。不过须要提前阐明的是,应用 zookeeper 实现延时工作不是一个很好的抉择,至多称不上优雅,题目中的优雅实现延时工作只是为了和前文响应,对于应用 zookeeper 实现延时工作的弊病,后文我会具体解释。

上手 curator

对于 zookeeper 的装置和应用这里就不介绍了,之前也推送过相干文章了,如果对 zookeeper 不理解的,能够翻下历史记录看下。接下来间接进入主题,首先来体验一把 curator 的延时工作 API。

首先是工作消费者:

public class DelayTaskConsumer  implements QueueConsumer<String>{
 @Override
 public void consumeMessage(String message) throws Exception {System.out.println(MessageFormat.format("公布资讯。id - {0} , timeStamp - {1} ," +
             "threadName - {2}",message,System.currentTimeMillis(),Thread.currentThread().getName()));
 }
 @Override
 public void stateChanged(CuratorFramework client, ConnectionState newState) {System.out.println(MessageFormat.format("State change . New State is - {0}",newState));
 }
}

curator 的消费者须要实现 QueueConsumer 接口,在这里咱们做的逻辑就是拿到工作形容(这里简略起见,工作形容就是资讯 id),而后公布相应的资讯。

接下来看下工作生产者:

public class DelayTaskProducer {

   private static final String CONNECT_ADDRESS="study-machine:32783";

   private static final int SESSION_OUTTIME = 5000;

   private static final String NAMESPACE = "delayTask";

   private static final String QUEUE_PATH = "/queue";

   private static final String LOCK_PATH = "/lock";

   private CuratorFramework curatorFramework;

   private DistributedDelayQueue<String> delayQueue;

   {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
       curatorFramework= CuratorFrameworkFactory.builder().connectString(CONNECT_ADDRESS)
               .sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy)
               .namespace(NAMESPACE).build();
       curatorFramework.start();
       delayQueue= QueueBuilder.builder(curatorFramework, new DelayTaskConsumer(),
               new DelayTaskSerializer(), QUEUE_PATH).lockPath(LOCK_PATH).buildDelayQueue();
       try {delayQueue.start();
       }catch (Exception e){e.printStackTrace();
       }
   }

   public void produce(String id,long timeStamp){
       try {delayQueue.put(id,timeStamp);
       }catch (Exception e){e.printStackTrace();
       }
   }

}

工作生产者次要有 2 个逻辑,一个是在结构代码块中初始化 curator 的延时工作队列,另一个是提供一个 produce 办法供内部往队列里放延时工作。

在初始化延时工作时,须要传入一个字节数组与工作形容实体之间的序列化器,这里简略地将工作形容解决成字符串:

public class DelayTaskSerializer implements QueueSerializer<String> {
   @Override
   public byte[] serialize(String item) {return item.getBytes();
   }
   @Override
   public String deserialize(byte[] bytes) {return new String(bytes);
   }
}

最初写一个客户端测一下:

public class DelayTaskTest {public static void main(String[] args) throws Exception{DelayTaskProducer producer=new DelayTaskProducer();
       long now=new Date().getTime();
       System.out.println(MessageFormat.format("start time - {0}",now));
       producer.produce("1",now+TimeUnit.SECONDS.toMillis(5));
       producer.produce("2",now+TimeUnit.SECONDS.toMillis(10));
       producer.produce("3",now+TimeUnit.SECONDS.toMillis(15));
       producer.produce("4",now+TimeUnit.SECONDS.toMillis(20));
       producer.produce("5",now+TimeUnit.SECONDS.toMillis(2000));
       TimeUnit.HOURS.sleep(1);
   }
}

客户端比较简单,就是往延时队列里放 5 个工作,其中最初一个工作的执行工夫比拟晚,次要是为了察看 curator 到底往 zookeeper 里放了些啥。运行程序,后果如下:

接下来咱们看下 zookeeper 里到底存了哪些信息:

[zk: localhost(CONNECTED) 2] ls /
[delayTask, zookeeper]

其中,zookeeper 节点是 zookeeper 自带的,除了 zookeeper 之后,还有一个 delayTask 节点,这个节点就是咱们在生产者里设置的命名空间 NAMESPACE。因为同一个 zookeeper 集群可能会被用于不同的延时队列,NAMESPACE 的作用就是用来辨别不同延时队列的。再看看 NAMESPACE 里是啥:

[zk: localhost(CONNECTED) 3] ls /delayTask
[lock, queue]

能够看到,有 2 个子节点:lock 跟 queue,别离是咱们在生产者中设置的分布式锁门路 LOCK_PATH 和队列门路 QUEUE_PATH。因为同一个延时队列可能会被不同线程监听,所以为了保障工作只被一个线程执行,zookeeper 在工作到期的时候须要申请到分布式锁后能力执行工作。接下来咱们重点看下 queue 节点下有什么:

[zk: localhost(CONNECTED) 7] ls /delayTask/queue
[queue-|165B92FCD69|0000000014]

发现外面只有一个子节点,咱们猜测应该就是咱们刚刚放到延时队列外面的还未执行的工作,咱们接着看看这个子节点上面还有没有子节点:

[zk: localhost(CONNECTED) 8] ls /delayTask/queue/queue-|165B92FCD69|0000000014
[]

发现没有了。

那咱们就看看 queue-|165B92FCD69|0000000014 这个节点外面放了什么数据:

[zk: localhost(CONNECTED) 9] get /delayTask/queue/queue-|165B92FCD69|0000000014
5
cZxid = 0x3d
ctime = Sat Sep 08 12:20:41 GMT 2018
mZxid = 0x3d
mtime = Sat Sep 08 12:20:41 GMT 2018
pZxid = 0x3d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0

能够发现放的是工作形容,也就是资讯 id——5。到这里咱们就会晓得了,zookeeper 把工作形容放到了相应工作节点下了,那么工作执行工夫放到哪里了呢?因为 queue-|165B92FCD69|0000000014 并没有子节点,所以咱们能够猜测工作执行工夫放在了节点名称上了。察看节点名称,queue 只是一个前缀,没什么信息量。0000000014 应该是节点序号(这里也能够猜想 zookeeper 用来寄存工作的节点是程序节点)。那么就只剩下 165B92FCD69 了,这个看上去并不像工夫戳或者日期,然而外面有字母,能够猜想会不会是工夫戳的十六进制示意。咱们将其转化为十进制看下:

@Test
   public void test(){long number = Long.parseLong("165B92FCD69", 16);
       System.out.println(number);
       System.out.println(new Date(number));
   }

能够转化为十进制,而后将十进制数转化成日期,的确也是咱们在一开始设置的工作执行工夫。这样一来就大略分明了 curator 是怎么利用 zookeeper 来存储延时工作的了:将工作执行工夫存储在节点名称中,将工作形容存储在节点相应的数据中。

那么到底是不是这样的呢?接下来咱们看下 curator 的源码就晓得了。

curator 源码解析

1.DistributedDelayQueue 类

curator 延时工作的入口就是 DistributedDelayQueue 类的 start 办法了。咱们先不说 start 办法,先来看看 DistributedDelayQueue 类有哪些属性:

private final DistributedQueue<T>      queue;


   DistributedDelayQueue
       (
           CuratorFramework client,
           QueueConsumer<T> consumer,
           QueueSerializer<T> serializer,
           String queuePath,
           ThreadFactory threadFactory,
           Executor executor,
           int minItemsBeforeRefresh,
           String lockPath,
           int maxItems,
           boolean putInBackground,
           int finalFlushMs
       )
   {Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative");

       queue = new DistributedQueue<T>
       (
           client,
           consumer,
           serializer,
           queuePath,
           threadFactory,
           executor,
           minItemsBeforeRefresh,
           true,
           lockPath,
           maxItems,
           putInBackground,
           finalFlushMs
       )
       {
           @Override
           protected long getDelay(String itemNode)
           {return getDelay(itemNode, System.currentTimeMillis());
           }

           private long getDelay(String itemNode, long sortTime)
           {long epoch = getEpoch(itemNode);
               return epoch - sortTime;
           }

           @Override
           protected void sortChildren(List<String> children)
           {final long sortTime = System.currentTimeMillis();
               Collections.sort
               (
                   children,
                   new Comparator<String>()
                   {
                       @Override
                       public int compare(String o1, String o2)
                       {long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                           return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                       }
                   }
               );
           }
       };
   }

这里截取一部分代码进去。实际上 DistributedDelayQueue 里只有一个 queue 属性,queue 属性是 DistributedQueue 类的实例,从名字能够看到其是一个分布式队列。不过 DistributedDelayQueue 里的 queue 比拟非凡,其是 DistributedQueue 类的匿名外部类的实例,这个匿名子类重写了 DistributedQueue 的局部办法,如:getDelay、sortChildren 等。这一点很重要,前面的代码会用到这 2 个办法。

2.DistributedDelayQueue 的入口 start 办法

接下来咱们就来看下 DistributedDelayQueue 的入口 start 办法:

/**
    * Start the queue. No other methods work until this is called
    *
    * @throws Exception startup errors
    */
   @Override
   public void     start() throws Exception
   {queue.start();
   }

能够看到,其调用的是 queue 的 start 办法。咱们跟进去看看:

@Override
   public void     start() throws Exception
   {if ( !state.compareAndSet(State.LATENT, State.STARTED) )
       {throw new IllegalStateException();
       }

       try
       {client.create().creatingParentContainersIfNeeded().forPath(queuePath);
       }
       catch (KeeperException.NodeExistsException ignore)
       {// this is OK}
       if (lockPath != null)
       {
           try
           {client.create().creatingParentContainersIfNeeded().forPath(lockPath);
           }
           catch (KeeperException.NodeExistsException ignore)
           {// this is OK}
       }

       if (!isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
       {childrenCache.start();
       }

       if (!isProducerOnly)
       {
           service.submit
               (new Callable<Object>()
                   {
                       @Override
                       public Object call()
                       {runLoop();
                           return null;
                       }
                   }
               );
       }
   }

这个办法首先是查看状态,而后创立一些必须的节点,如后面的 queue 节点和 lock 节点就是在这里创立的。

因为咱们创立 queue 的时候有传入了消费者,所以这里 isProducerOnly 为 true,故以下 2 个分支的代码都会执行:

if (!isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
       {childrenCache.start();
       }

       if (!isProducerOnly)
       {
           service.submit
               (new Callable<Object>()
                   {
                       @Override
                       public Object call()
                       {runLoop();
                           return null;
                       }
                   }
               );
       }

2.1.childrenCache.start()

先来看看第一个分支:

childrenCache.start();

从名字上看,这个 childrenCache 应该是子节点的缓存,咱们进到 start 办法里看看:

void start() throws Exception
   {sync(true);
   }

调的是 sync 办法,咱们跟进去看看:

private synchronized void sync(boolean watched) throws Exception
   {if ( watched)
       {client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
       }
       else
       {client.getChildren().inBackground(callback).forPath(path);
       }
   }

这里 watched 为 true,所以会走第一个分支。第一个分支代码的作用是在后盾去拿 path 门路下的子节点,这里的 path 就是咱们配置的 queue_path。拿到子节点后,会调用 callback 里的回调办法。咱们看下这里的 callback 做了什么:

private final BackgroundCallback  callback = new BackgroundCallback()
   {
       @Override
       public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
       {if ( event.getResultCode() == KeeperException.Code.OK.intValue())
           {setNewChildren(event.getChildren());
           }
       }
   };

能够看到,当有子节点时,会去调用 setNewChildren 办法。咱们持续跟进去:

private synchronized void setNewChildren(List<String> newChildren)
   {if ( newChildren != null)
       {Data currentData = children.get();

           children.set(new Data(newChildren, currentData.version + 1));
           notifyFromCallback();}
   }

这里就是把子节点放到缓存里,并调用 notifyFromCallback 办法:

private synchronized void notifyFromCallback()
   {notifyAll();
   }

这里就是唤醒所有期待线程。既然有唤醒,那么就肯定有期待。持续看 ChildrenCache 类的其余办法,发现在 blockingNextGetData 办法中,调用了 wait 办法:

synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException
   {long            startMs = System.currentTimeMillis();
       boolean         hasMaxWait = (unit != null);
       long            maxWaitMs = hasMaxWait ? unit.toMillis(maxWait) : -1;
       while (startVersion == children.get().version )
       {if ( hasMaxWait)
           {long        elapsedMs = System.currentTimeMillis() - startMs;
               long        thisWaitMs = maxWaitMs - elapsedMs;
               if (thisWaitMs <= 0)
               {break;}
               wait(thisWaitMs);
           }
           else
           {wait();
           }
       }
       return children.get();}

当 blockingNextGetData 办法被调用时,会先睡眠,当有子节点到来时,期待线程才会被唤醒,进而返回以后的子节点。这个 blockingNextGetData 办法前面还会看到。

2.2.runLoop 办法

接下来咱们看下 start 办法的最初一段代码:

service.submit
               (new Callable<Object>()
                   {
                       @Override
                       public Object call()
                       {runLoop();
                           return null;
                       }
                   }
               );

这段代码次要是向线程池提交了一个 Callable,次要逻辑是 runLoop 办法。咱们进到 runLoop 办法里看看:

private void runLoop()
   {
       long         currentVersion = -1;
       long         maxWaitMs = -1;
       try
       {while ( state.get() == State.STARTED  )
           {
               try
               {ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                   currentVersion = data.version;

                   List<String>        children = Lists.newArrayList(data.children);
                   sortChildren(children); // makes sure items are processed in the correct order

                   if (children.size() > 0 )
                   {maxWaitMs = getDelay(children.get(0));
                       if (maxWaitMs > 0)
                       {continue;}
                   }
                   else
                   {continue;}

                   processChildren(children, currentVersion);
               }
               catch (InterruptedException e)
               {
                   // swallow the interrupt as it's only possible from either a background
                   // operation and, thus, doesn't apply to this loop or the instance
                   // is being closed in which case the while test will get it
               }
           }
       }
       catch (Exception e)
       {log.error("Exception caught in background handler", e);
       }
   }

能够看到,runLoop 办法就是一个死循环,只有与服务器的状态始终是 STARTED,这个循环就不会退出。

首先看这句代码:

ChildrenCache.Data      data = (maxWaitMs > 0) ?
childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) :
childrenCache.blockingNextGetData(currentVersion);

这行代码比拟长,我把他拆成多行了。这句代码次要是去获取子节点,后面说了,当调用 blockingNextGetData 办法时,会先期待,直到有新的子节点时,才会调用 notifyAll 唤醒期待线程。

拿到子节点后就对子节点列表进行排序:

sortChildren(children); // makes sure items are processed in the correct order

sortChildren 办法是 DistributedQueue 类的办法,在一开始剖析 DistributedDelayQueue 类的时候说到,DistributedDelayQueue 类中的 queue 是一个匿名外部类实例,其重写了 getDelay 和 sortChildren 等办法,因而咱们要看通过重写的 getDelay 和 sortChildren 是怎么的,因为 sortChildren 办法依赖 getDelay 办法,因而咱们先看看 getDelay 办法:

@Override
           protected long getDelay(String itemNode)
           {return getDelay(itemNode, System.currentTimeMillis());
           }

其会去调用 getDelay 公有办法,同时传入以后工夫戳:

private long getDelay(String itemNode, long sortTime)
           {long epoch = getEpoch(itemNode);
               return epoch - sortTime;
           }

getDelay 公有办法又会去调用 getEpoch 办法:

private static long getEpoch(String itemNode)
   {int     index2 = itemNode.lastIndexOf(SEPARATOR);
       int     index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
       if ((index1 > 0) && (index2 > (index1 + 1)) )
       {
           try
           {String  epochStr = itemNode.substring(index1 + 1, index2);
               return Long.parseLong(epochStr, 16);
           }
           catch (NumberFormatException ignore)
           {// ignore}
       }
       return 0;
   }

getEpoch 办法其实就是去解析子节点名称的,后面带大家看了 zookeeper 队列门路下的子节点名称,是这种模式的:queue-|165B92FCD69|0000000014。这个办法的作用就是将其中的工作执行的工夫戳给解析进去,也就是两头的那段字符串。拿到字符串后再将十六进制转化为十进制:

Long.parseLong(epochStr, 16);

这样验证了咱们之前的猜测:curator 会把工作执行工夫编码成十六进制放到节点名称里。至于为什么要编码成十六进制,集体认为应该是为了节俭字符串长度。

咱们再回到公有办法 getDelay:

private long getDelay(String itemNode, long sortTime)
           {long epoch = getEpoch(itemNode);
               return epoch - sortTime;
           }

拿到延时工作执行工夫戳后,再跟以后工夫戳相减,得出工作执行工夫戳跟以后工夫戳的差值,这个差值决定了这个工作要不要立刻执行,如果说这个差值小于或等于 0,阐明工作曾经到了执行工夫,那么就会执行相应的工作。当然这个差值还有一个用处,就是用于排序,具体在 sortChildren 办法外面:

@Override
           protected void sortChildren(List<String> children)
           {final long sortTime = System.currentTimeMillis();
               Collections.sort
               (
                   children,
                   new Comparator<String>()
                   {
                       @Override
                       public int compare(String o1, String o2)
                       {long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                           return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                       }
                   }
               );
           }

这个 sortChildren 办法是通过重写了的匿名外部类的办法,其依据工作执行工夫与以后工夫戳的差值进行排序,越早执行的工作排在后面,这样就能够保障延时工作是按执行工夫从早到晚排序的了。

剖析完了 getDelay 和 sortChildren,咱们再回到 runLoop 办法:

ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                   currentVersion = data.version;

                   List<String>        children = Lists.newArrayList(data.children);
                   sortChildren(children); // makes sure items are processed in the correct order

                   if (children.size() > 0 )
                   {maxWaitMs = getDelay(children.get(0));
                       if (maxWaitMs > 0)
                       {continue;}
                   }
                   else
                   {continue;}

                   processChildren(children, currentVersion);

在对子节点按执行工夫进行升序排序后,会先拿到排在最后面的子节点,判断该子节点的执行工夫与以后工夫戳的差值是否小于 0,如果小于 0,则阐明到了执行工夫,那么就会调用上面这行代码:

processChildren(children, currentVersion);

咱们跟进去看看:

private void processChildren(List<String> children, long currentVersion) throws Exception
   {final Semaphore processedLatch = new Semaphore(0);
       final boolean   isUsingLockSafety = (lockPath != null);
       int             min = minItemsBeforeRefresh;
       for (final String itemNode : children)
       {if ( Thread.currentThread().isInterrupted())
           {processedLatch.release(children.size());
               break;
           }

           if (!itemNode.startsWith(QUEUE_ITEM_NAME) )
           {log.warn("Foreign node in queue path:" + itemNode);
               processedLatch.release();
               continue;
           }

           if (min-- <= 0)
           {if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
               {processedLatch.release(children.size());
                   break;
               }
           }

           if (getDelay(itemNode) > 0 )
           {processedLatch.release();
               continue;
           }

           executor.execute
           (new Runnable()
               {
                   @Override
                   public void run()
                   {
                       try
                       {if ( isUsingLockSafety)
                           {processWithLockSafety(itemNode, ProcessType.NORMAL);
                           }
                           else
                           {processNormally(itemNode, ProcessType.NORMAL);
                           }
                       }
                       catch (Exception e)
                       {ThreadUtils.checkInterrupted(e);
                           log.error("Error processing message at" + itemNode, e);
                       }
                       finally
                       {processedLatch.release();
                       }
                   }
               }
           );
       }

       processedLatch.acquire(children.size());
   }

这里用信号量 Semaphore 保障了只有当所有子节点都被遍历并解决了或者线程被中断了,这个办法才会返回。如果这段程序是单线程执行的,那么不须要应用信号量也能做到这一点。然而大家看代码就晓得,这个办法在执行到期的延时工作的时候是放到线程池外面执行的,所以才须要应用信号量来保障当所有工作被遍历并解决了,这个办法才返回。

咱们重点关注延时工作的执行局部:

executor.execute
           (new Runnable()
               {
                   @Override
                   public void run()
                   {
                       try
                       {if ( isUsingLockSafety)
                           {processWithLockSafety(itemNode, ProcessType.NORMAL);
                           }
                           else
                           {processNormally(itemNode, ProcessType.NORMAL);
                           }
                       }
                       catch (Exception e)
                       {ThreadUtils.checkInterrupted(e);
                           log.error("Error processing message at" + itemNode, e);
                       }
                       finally
                       {processedLatch.release();
                       }
                   }
               }
           );

因为咱们在初始化延时队列的时候传入了 lockPath,所以实际上会走到上面这个分支:

processWithLockSafety(itemNode, ProcessType.NORMAL);

从办法名能够看到,这个形式是应用锁的形式来解决延时工作。这里顺便提一句,好的代码是自解释的,咱们仅仅看办法名就能够大略晓得这个办法是做什么的,这一点大家平时在写代码的时候要时刻牢记,因为我在公司的老零碎上曾经看到不少 method1、method2 之类的办法命名了。这里略去 1 万字……

咱们进到 processWithLockSafety 办法外面去:

@VisibleForTesting
   protected boolean processWithLockSafety(String itemNode, ProcessType type) throws Exception
   {String      lockNodePath = ZKPaths.makePath(lockPath, itemNode);
       boolean     lockCreated = false;
       try
       {client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath);
           lockCreated = true;

           String  itemPath = ZKPaths.makePath(queuePath, itemNode);
           boolean requeue = false;
           byte[]  bytes = null;
           if (type == ProcessType.NORMAL)
           {bytes = client.getData().forPath(itemPath);
               requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE);
           }

           if (requeue)
           {client.inTransaction()
                   .delete().forPath(itemPath)
                   .and()
                   .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeRequeueItemPath(itemPath), bytes)
                   .and()
                   .commit();}
           else
           {client.delete().forPath(itemPath);
           }

           return true;
       }
       catch (KeeperException.NodeExistsException ignore)
       {// another process got it}
       catch (KeeperException.NoNodeException ignore)
       {// another process got it}
       catch (KeeperException.BadVersionException ignore)
       {// another process got it}
       finally
       {if ( lockCreated)
           {client.delete().guaranteed().forPath(lockNodePath);
           }
       }

       return false;
   }

这个办法首先会申请分布式锁:

client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath);

这里申请锁是通过创立长期节点的形式实现的,一个工作只对应一个节点,所以只有一个 zk 客户端可能创立胜利,也就是说只有一个客户端能够拿到锁。

拿到锁后就是解决工作了,最初在 finally 块中开释分布式锁。

咱们重点看下解决工作那一块:

requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE);

咱们进到 processMessageBytes 外面去:

private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception
   {
       ProcessMessageBytesCode     resultCode = ProcessMessageBytesCode.NORMAL;
       MultiItem<T>                items;
       try
       {items = ItemSerializer.deserialize(bytes, serializer);
       }
       catch (Throwable e)
       {ThreadUtils.checkInterrupted(e);
           log.error("Corrupted queue item:" + itemNode, e);
           return resultCode;
       }

       for(;;)
       {T       item = items.nextItem();
           if (item == null)
           {break;}

           try
           {consumer.consumeMessage(item);
           }
           catch (Throwable e)
           {ThreadUtils.checkInterrupted(e);
               log.error("Exception processing queue item:" + itemNode, e);
               if (errorMode.get() == ErrorMode.REQUEUE )
               {
                   resultCode = ProcessMessageBytesCode.REQUEUE;
                   break;
               }
           }
       }
       return resultCode;
   }

千呼万唤始进去,总算看到工作生产的代码了:

consumer.consumeMessage(item);

这里的 consumer 就是咱们初始化延时工作队列时传入的工作消费者了。到这里 curator 延时工作的解决逻辑就全副讲完了。其余细节大家能够本人去看下源码,这里就不细讲了。

总结

这里简略回顾下 curator 实现延时工作的逻辑:首先在生产工作的时候,将所有工作都放到同一个节点上面,其中工作执行工夫放到子节点的名称中,工作形容放到子节点的 data 中。后盾会有一个线程去扫相应队列节点下的所有子节点,客户端拿到这些子节点后会将执行工夫和工作形容解析进去,再按工作执行工夫从早到晚排序,再顺次解决到期的工作,解决完再删除相应的子节点。这就是 curator 解决延时工作的大抵流程了。

后面说了,curator 实现延时工作不是很优雅,具体不优雅在哪里呢?首先,curator 对工作执行工夫的排序不是在 zookeeper 服务端实现的,而是在客户端进行,如果说有人一次性往 zookeeper 里放了 100 万个延时工作,那么 curator 也会全副拿到客户端进行排序,这在工作数多的时候必定是有问题的。再者,zookeeper 的主要用途不是用于存储的,他不像 MySQL 或者 Redis 一样,被设计成存储系统,zookeeper 更多地是作为分布式协调系统,存储不是他的强项,所以如果你要存储的延时工作很多,用 zookeeper 来做也是不适合的。

之所以花了这么大的篇幅来介绍 curator 如何利用 zookeeper 来实现延时工作,是为了通知大家,不是只有有轮子就能够间接拿来用的,如果不关怀轮子是怎么实现的,那有一天出了问题就无从下手了。

{

               resultCode = ProcessMessageBytesCode.REQUEUE;
               break;
           }
       }
   }
   return resultCode;

}


千呼万唤始进去,总算看到工作生产的代码了:

consumer.consumeMessage(item);

这里的 consumer 就是咱们初始化延时工作队列时传入的工作消费者了。到这里 curator 延时工作的解决逻辑就全副讲完了。其余细节大家能够本人去看下源码,这里就不细讲了。

总结

这里简略回顾下 curator 实现延时工作的逻辑:首先在生产工作的时候,将所有工作都放到同一个节点上面,其中工作执行工夫放到子节点的名称中,工作形容放到子节点的 data 中。后盾会有一个线程去扫相应队列节点下的所有子节点,客户端拿到这些子节点后会将执行工夫和工作形容解析进去,再按工作执行工夫从早到晚排序,再顺次解决到期的工作,解决完再删除相应的子节点。这就是 curator 解决延时工作的大抵流程了。

后面说了,curator 实现延时工作不是很优雅,具体不优雅在哪里呢?首先,curator 对工作执行工夫的排序不是在 zookeeper 服务端实现的,而是在客户端进行,如果说有人一次性往 zookeeper 里放了 100 万个延时工作,那么 curator 也会全副拿到客户端进行排序,这在工作数多的时候必定是有问题的。再者,zookeeper 的主要用途不是用于存储的,他不像 MySQL 或者 Redis 一样,被设计成存储系统,zookeeper 更多地是作为分布式协调系统,存储不是他的强项,所以如果你要存储的延时工作很多,用 zookeeper 来做也是不适合的。

之所以花了这么大的篇幅来介绍 curator 如何利用 zookeeper 来实现延时工作,是为了通知大家,不是只有有轮子就能够间接拿来用的,如果不关怀轮子是怎么实现的,那有一天出了问题就无从下手了。

对于延时工作之 zookeeper,你学废了么?

正文完
 0