前言
实现延时工作的关键点,是要存储工作的形容和工作的执行工夫,还要能依据工作执行工夫进行排序,那么咱们可不可以应用zookeeper来实现延时工作呢?答案当然是必定的。要晓得,zookeeper的znode同样能够用来存储数据,那么咱们就能够利用这一点来实现延时工作。实际上,驰名的zookeeper客户端curator就提供了基于zookeeper的延时工作API,明天就从源码的角度带大家理解下curator是如何应用zookeeper实现延时工作的。不过须要提前阐明的是,应用zookeeper实现延时工作不是一个很好的抉择,至多称不上优雅,题目中的优雅实现延时工作只是为了和前文响应,对于应用zookeeper实现延时工作的弊病,后文我会具体解释。
上手curator
对于zookeeper的装置和应用这里就不介绍了,之前也推送过相干文章了,如果对zookeeper不理解的,能够翻下历史记录看下。接下来间接进入主题,首先来体验一把curator的延时工作API。
首先是工作消费者:
public class DelayTaskConsumer implements QueueConsumer<String>{ @Override public void consumeMessage(String message) throws Exception { System.out.println(MessageFormat.format("公布资讯。id - {0} , timeStamp - {1} , " + "threadName - {2}",message,System.currentTimeMillis(),Thread.currentThread().getName())); } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println(MessageFormat.format("State change . New State is - {0}",newState)); }}
curator的消费者须要实现QueueConsumer接口,在这里咱们做的逻辑就是拿到工作形容(这里简略起见,工作形容就是资讯id),而后公布相应的资讯。
接下来看下工作生产者:
public class DelayTaskProducer { private static final String CONNECT_ADDRESS="study-machine:32783"; private static final int SESSION_OUTTIME = 5000; private static final String NAMESPACE = "delayTask"; private static final String QUEUE_PATH = "/queue"; private static final String LOCK_PATH = "/lock"; private CuratorFramework curatorFramework; private DistributedDelayQueue<String> delayQueue; { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); curatorFramework= CuratorFrameworkFactory.builder().connectString(CONNECT_ADDRESS) .sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy) .namespace(NAMESPACE).build(); curatorFramework.start(); delayQueue= QueueBuilder.builder(curatorFramework, new DelayTaskConsumer(), new DelayTaskSerializer(), QUEUE_PATH).lockPath(LOCK_PATH).buildDelayQueue(); try { delayQueue.start(); }catch (Exception e){ e.printStackTrace(); } } public void produce(String id,long timeStamp){ try { delayQueue.put(id,timeStamp); }catch (Exception e){ e.printStackTrace(); } }}
工作生产者次要有2个逻辑,一个是在结构代码块中初始化curator的延时工作队列,另一个是提供一个produce办法供内部往队列里放延时工作。
在初始化延时工作时,须要传入一个字节数组与工作形容实体之间的序列化器,这里简略地将工作形容解决成字符串:
public class DelayTaskSerializer implements QueueSerializer<String> { @Override public byte[] serialize(String item) { return item.getBytes(); } @Override public String deserialize(byte[] bytes) { return new String(bytes); }}
最初写一个客户端测一下:
public class DelayTaskTest { public static void main(String[] args) throws Exception{ DelayTaskProducer producer=new DelayTaskProducer(); long now=new Date().getTime(); System.out.println(MessageFormat.format("start time - {0}",now)); producer.produce("1",now+TimeUnit.SECONDS.toMillis(5)); producer.produce("2",now+TimeUnit.SECONDS.toMillis(10)); producer.produce("3",now+TimeUnit.SECONDS.toMillis(15)); producer.produce("4",now+TimeUnit.SECONDS.toMillis(20)); producer.produce("5",now+TimeUnit.SECONDS.toMillis(2000)); TimeUnit.HOURS.sleep(1); }}
客户端比较简单,就是往延时队列里放5个工作,其中最初一个工作的执行工夫比拟晚,次要是为了察看curator到底往zookeeper里放了些啥。运行程序,后果如下:
接下来咱们看下zookeeper里到底存了哪些信息:
[zk: localhost(CONNECTED) 2] ls /[delayTask, zookeeper]
其中,zookeeper节点是zookeeper自带的,除了zookeeper之后,还有一个delayTask节点,这个节点就是咱们在生产者里设置的命名空间NAMESPACE。因为同一个zookeeper集群可能会被用于不同的延时队列,NAMESPACE的作用就是用来辨别不同延时队列的。再看看NAMESPACE里是啥:
[zk: localhost(CONNECTED) 3] ls /delayTask[lock, queue]
能够看到,有2个子节点:lock跟queue,别离是咱们在生产者中设置的分布式锁门路LOCK_PATH和队列门路QUEUE_PATH。因为同一个延时队列可能会被不同线程监听,所以为了保障工作只被一个线程执行,zookeeper在工作到期的时候须要申请到分布式锁后能力执行工作。接下来咱们重点看下queue节点下有什么:
[zk: localhost(CONNECTED) 7] ls /delayTask/queue[queue-|165B92FCD69|0000000014]
发现外面只有一个子节点,咱们猜测应该就是咱们刚刚放到延时队列外面的还未执行的工作,咱们接着看看这个子节点上面还有没有子节点:
[zk: localhost(CONNECTED) 8] ls /delayTask/queue/queue-|165B92FCD69|0000000014[]
发现没有了。
那咱们就看看queue-|165B92FCD69|0000000014这个节点外面放了什么数据:
[zk: localhost(CONNECTED) 9] get /delayTask/queue/queue-|165B92FCD69|00000000145cZxid = 0x3dctime = Sat Sep 08 12:20:41 GMT 2018mZxid = 0x3dmtime = Sat Sep 08 12:20:41 GMT 2018pZxid = 0x3dcversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 11numChildren = 0
能够发现放的是工作形容,也就是资讯id——5。到这里咱们就会晓得了,zookeeper把工作形容放到了相应工作节点下了,那么工作执行工夫放到哪里了呢?因为queue-|165B92FCD69|0000000014并没有子节点,所以咱们能够猜测工作执行工夫放在了节点名称上了。察看节点名称,queue只是一个前缀,没什么信息量。0000000014应该是节点序号(这里也能够猜想zookeeper用来寄存工作的节点是程序节点)。那么就只剩下165B92FCD69了,这个看上去并不像工夫戳或者日期,然而外面有字母,能够猜想会不会是工夫戳的十六进制示意。咱们将其转化为十进制看下:
@Test public void test(){ long number = Long.parseLong("165B92FCD69", 16); System.out.println(number); System.out.println(new Date(number)); }
能够转化为十进制,而后将十进制数转化成日期,的确也是咱们在一开始设置的工作执行工夫。这样一来就大略分明了curator是怎么利用zookeeper来存储延时工作的了:将工作执行工夫存储在节点名称中,将工作形容存储在节点相应的数据中。
那么到底是不是这样的呢?接下来咱们看下curator的源码就晓得了。
curator源码解析
1.DistributedDelayQueue类
curator延时工作的入口就是DistributedDelayQueue类的start办法了。咱们先不说start办法,先来看看DistributedDelayQueue类有哪些属性:
private final DistributedQueue<T> queue; DistributedDelayQueue ( CuratorFramework client, QueueConsumer<T> consumer, QueueSerializer<T> serializer, String queuePath, ThreadFactory threadFactory, Executor executor, int minItemsBeforeRefresh, String lockPath, int maxItems, boolean putInBackground, int finalFlushMs ) { Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative"); queue = new DistributedQueue<T> ( client, consumer, serializer, queuePath, threadFactory, executor, minItemsBeforeRefresh, true, lockPath, maxItems, putInBackground, finalFlushMs ) { @Override protected long getDelay(String itemNode) { return getDelay(itemNode, System.currentTimeMillis()); } private long getDelay(String itemNode, long sortTime) { long epoch = getEpoch(itemNode); return epoch - sortTime; } @Override protected void sortChildren(List<String> children) { final long sortTime = System.currentTimeMillis(); Collections.sort ( children, new Comparator<String>() { @Override public int compare(String o1, String o2) { long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime); return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); } } ); } }; }
这里截取一部分代码进去。实际上DistributedDelayQueue里只有一个queue属性,queue属性是DistributedQueue类的实例,从名字能够看到其是一个分布式队列。不过DistributedDelayQueue里的queue比拟非凡,其是DistributedQueue类的匿名外部类的实例,这个匿名子类重写了DistributedQueue的局部办法,如:getDelay、sortChildren等。这一点很重要,前面的代码会用到这2个办法。
2.DistributedDelayQueue的入口start办法
接下来咱们就来看下DistributedDelayQueue的入口start办法:
/** * Start the queue. No other methods work until this is called * * @throws Exception startup errors */ @Override public void start() throws Exception { queue.start(); }
能够看到,其调用的是queue的start办法。咱们跟进去看看:
@Override public void start() throws Exception { if ( !state.compareAndSet(State.LATENT, State.STARTED) ) { throw new IllegalStateException(); } try { client.create().creatingParentContainersIfNeeded().forPath(queuePath); } catch ( KeeperException.NodeExistsException ignore ) { // this is OK } if ( lockPath != null ) { try { client.create().creatingParentContainersIfNeeded().forPath(lockPath); } catch ( KeeperException.NodeExistsException ignore ) { // this is OK } } if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) ) { childrenCache.start(); } if ( !isProducerOnly ) { service.submit ( new Callable<Object>() { @Override public Object call() { runLoop(); return null; } } ); } }
这个办法首先是查看状态,而后创立一些必须的节点,如后面的queue节点和lock节点就是在这里创立的。
因为咱们创立queue的时候有传入了消费者,所以这里isProducerOnly为true,故以下2个分支的代码都会执行:
if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) ) { childrenCache.start(); } if ( !isProducerOnly ) { service.submit ( new Callable<Object>() { @Override public Object call() { runLoop(); return null; } } ); }
2.1.childrenCache.start()
先来看看第一个分支:
childrenCache.start();
从名字上看,这个childrenCache应该是子节点的缓存,咱们进到start办法里看看:
void start() throws Exception { sync(true); }
调的是sync办法,咱们跟进去看看:
private synchronized void sync(boolean watched) throws Exception { if ( watched ) { client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path); } else { client.getChildren().inBackground(callback).forPath(path); } }
这里watched为true,所以会走第一个分支。第一个分支代码的作用是在后盾去拿path门路下的子节点,这里的path就是咱们配置的queue_path。拿到子节点后,会调用callback里的回调办法。咱们看下这里的callback做了什么:
private final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { setNewChildren(event.getChildren()); } } };
能够看到,当有子节点时,会去调用setNewChildren办法。咱们持续跟进去:
private synchronized void setNewChildren(List<String> newChildren) { if ( newChildren != null ) { Data currentData = children.get(); children.set(new Data(newChildren, currentData.version + 1)); notifyFromCallback(); } }
这里就是把子节点放到缓存里,并调用notifyFromCallback办法:
private synchronized void notifyFromCallback() { notifyAll(); }
这里就是唤醒所有期待线程。既然有唤醒,那么就肯定有期待。持续看ChildrenCache类的其余办法,发现在blockingNextGetData办法中,调用了wait办法:
synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? unit.toMillis(maxWait) : -1; while ( startVersion == children.get().version ) { if ( hasMaxWait ) { long elapsedMs = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsedMs; if ( thisWaitMs <= 0 ) { break; } wait(thisWaitMs); } else { wait(); } } return children.get(); }
当blockingNextGetData办法被调用时,会先睡眠,当有子节点到来时,期待线程才会被唤醒,进而返回以后的子节点。这个blockingNextGetData办法前面还会看到。
2.2.runLoop办法
接下来咱们看下start办法的最初一段代码:
service.submit ( new Callable<Object>() { @Override public Object call() { runLoop(); return null; } } );
这段代码次要是向线程池提交了一个Callable,次要逻辑是runLoop办法。咱们进到runLoop办法里看看:
private void runLoop() { long currentVersion = -1; long maxWaitMs = -1; try { while ( state.get() == State.STARTED ) { try { ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion); currentVersion = data.version; List<String> children = Lists.newArrayList(data.children); sortChildren(children); // makes sure items are processed in the correct order if ( children.size() > 0 ) { maxWaitMs = getDelay(children.get(0)); if ( maxWaitMs > 0 ) { continue; } } else { continue; } processChildren(children, currentVersion); } catch ( InterruptedException e ) { // swallow the interrupt as it's only possible from either a background // operation and, thus, doesn't apply to this loop or the instance // is being closed in which case the while test will get it } } } catch ( Exception e ) { log.error("Exception caught in background handler", e); } }
能够看到,runLoop办法就是一个死循环,只有与服务器的状态始终是STARTED,这个循环就不会退出。
首先看这句代码:
ChildrenCache.Data data = (maxWaitMs > 0) ?childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) :childrenCache.blockingNextGetData(currentVersion);
这行代码比拟长,我把他拆成多行了。这句代码次要是去获取子节点,后面说了,当调用blockingNextGetData办法时,会先期待,直到有新的子节点时,才会调用notifyAll唤醒期待线程。
拿到子节点后就对子节点列表进行排序:
sortChildren(children); // makes sure items are processed in the correct order
sortChildren办法是DistributedQueue类的办法,在一开始剖析DistributedDelayQueue类的时候说到,DistributedDelayQueue类中的queue是一个匿名外部类实例,其重写了getDelay和sortChildren等办法,因而咱们要看通过重写的getDelay和sortChildren是怎么的,因为sortChildren办法依赖getDelay办法,因而咱们先看看getDelay办法:
@Override protected long getDelay(String itemNode) { return getDelay(itemNode, System.currentTimeMillis()); }
其会去调用getDelay公有办法,同时传入以后工夫戳:
private long getDelay(String itemNode, long sortTime) { long epoch = getEpoch(itemNode); return epoch - sortTime; }
getDelay公有办法又会去调用getEpoch办法:
private static long getEpoch(String itemNode) { int index2 = itemNode.lastIndexOf(SEPARATOR); int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1; if ( (index1 > 0) && (index2 > (index1 + 1)) ) { try { String epochStr = itemNode.substring(index1 + 1, index2); return Long.parseLong(epochStr, 16); } catch ( NumberFormatException ignore ) { // ignore } } return 0; }
getEpoch办法其实就是去解析子节点名称的,后面带大家看了zookeeper队列门路下的子节点名称,是这种模式的:queue-|165B92FCD69|0000000014。这个办法的作用就是将其中的工作执行的工夫戳给解析进去,也就是两头的那段字符串。拿到字符串后再将十六进制转化为十进制:
Long.parseLong(epochStr, 16);
这样验证了咱们之前的猜测:curator会把工作执行工夫编码成十六进制放到节点名称里。至于为什么要编码成十六进制,集体认为应该是为了节俭字符串长度。
咱们再回到公有办法getDelay:
private long getDelay(String itemNode, long sortTime) { long epoch = getEpoch(itemNode); return epoch - sortTime; }
拿到延时工作执行工夫戳后,再跟以后工夫戳相减,得出工作执行工夫戳跟以后工夫戳的差值,这个差值决定了这个工作要不要立刻执行,如果说这个差值小于或等于0,阐明工作曾经到了执行工夫,那么就会执行相应的工作。当然这个差值还有一个用处,就是用于排序,具体在sortChildren办法外面:
@Override protected void sortChildren(List<String> children) { final long sortTime = System.currentTimeMillis(); Collections.sort ( children, new Comparator<String>() { @Override public int compare(String o1, String o2) { long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime); return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); } } ); }
这个sortChildren办法是通过重写了的匿名外部类的办法,其依据工作执行工夫与以后工夫戳的差值进行排序,越早执行的工作排在后面,这样就能够保障延时工作是按执行工夫从早到晚排序的了。
剖析完了getDelay和sortChildren,咱们再回到runLoop办法:
ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion); currentVersion = data.version; List<String> children = Lists.newArrayList(data.children); sortChildren(children); // makes sure items are processed in the correct order if ( children.size() > 0 ) { maxWaitMs = getDelay(children.get(0)); if ( maxWaitMs > 0 ) { continue; } } else { continue; } processChildren(children, currentVersion);
在对子节点按执行工夫进行升序排序后,会先拿到排在最后面的子节点,判断该子节点的执行工夫与以后工夫戳的差值是否小于0,如果小于0,则阐明到了执行工夫,那么就会调用上面这行代码:
processChildren(children, currentVersion);
咱们跟进去看看:
private void processChildren(List<String> children, long currentVersion) throws Exception { final Semaphore processedLatch = new Semaphore(0); final boolean isUsingLockSafety = (lockPath != null); int min = minItemsBeforeRefresh; for ( final String itemNode : children ) { if ( Thread.currentThread().isInterrupted() ) { processedLatch.release(children.size()); break; } if ( !itemNode.startsWith(QUEUE_ITEM_NAME) ) { log.warn("Foreign node in queue path: " + itemNode); processedLatch.release(); continue; } if ( min-- <= 0 ) { if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) ) { processedLatch.release(children.size()); break; } } if ( getDelay(itemNode) > 0 ) { processedLatch.release(); continue; } executor.execute ( new Runnable() { @Override public void run() { try { if ( isUsingLockSafety ) { processWithLockSafety(itemNode, ProcessType.NORMAL); } else { processNormally(itemNode, ProcessType.NORMAL); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Error processing message at " + itemNode, e); } finally { processedLatch.release(); } } } ); } processedLatch.acquire(children.size()); }
这里用信号量Semaphore保障了只有当所有子节点都被遍历并解决了或者线程被中断了,这个办法才会返回。如果这段程序是单线程执行的,那么不须要应用信号量也能做到这一点。然而大家看代码就晓得,这个办法在执行到期的延时工作的时候是放到线程池外面执行的,所以才须要应用信号量来保障当所有工作被遍历并解决了,这个办法才返回。
咱们重点关注延时工作的执行局部:
executor.execute ( new Runnable() { @Override public void run() { try { if ( isUsingLockSafety ) { processWithLockSafety(itemNode, ProcessType.NORMAL); } else { processNormally(itemNode, ProcessType.NORMAL); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Error processing message at " + itemNode, e); } finally { processedLatch.release(); } } } );
因为咱们在初始化延时队列的时候传入了lockPath ,所以实际上会走到上面这个分支:
processWithLockSafety(itemNode, ProcessType.NORMAL);
从办法名能够看到,这个形式是应用锁的形式来解决延时工作。这里顺便提一句,好的代码是自解释的,咱们仅仅看办法名就能够大略晓得这个办法是做什么的,这一点大家平时在写代码的时候要时刻牢记,因为我在公司的老零碎上曾经看到不少method1、method2之类的办法命名了。这里略去1万字……
咱们进到processWithLockSafety办法外面去:
@VisibleForTesting protected boolean processWithLockSafety(String itemNode, ProcessType type) throws Exception { String lockNodePath = ZKPaths.makePath(lockPath, itemNode); boolean lockCreated = false; try { client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath); lockCreated = true; String itemPath = ZKPaths.makePath(queuePath, itemNode); boolean requeue = false; byte[] bytes = null; if ( type == ProcessType.NORMAL ) { bytes = client.getData().forPath(itemPath); requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE); } if ( requeue ) { client.inTransaction() .delete().forPath(itemPath) .and() .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeRequeueItemPath(itemPath), bytes) .and() .commit(); } else { client.delete().forPath(itemPath); } return true; } catch ( KeeperException.NodeExistsException ignore ) { // another process got it } catch ( KeeperException.NoNodeException ignore ) { // another process got it } catch ( KeeperException.BadVersionException ignore ) { // another process got it } finally { if ( lockCreated ) { client.delete().guaranteed().forPath(lockNodePath); } } return false; }
这个办法首先会申请分布式锁:
client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath);
这里申请锁是通过创立长期节点的形式实现的,一个工作只对应一个节点,所以只有一个zk客户端可能创立胜利,也就是说只有一个客户端能够拿到锁。
拿到锁后就是解决工作了,最初在finally块中开释分布式锁。
咱们重点看下解决工作那一块:
requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE);
咱们进到processMessageBytes外面去:
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception { ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL; MultiItem<T> items; try { items = ItemSerializer.deserialize(bytes, serializer); } catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); log.error("Corrupted queue item: " + itemNode, e); return resultCode; } for(;;) { T item = items.nextItem(); if ( item == null ) { break; } try { consumer.consumeMessage(item); } catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); log.error("Exception processing queue item: " + itemNode, e); if ( errorMode.get() == ErrorMode.REQUEUE ) { resultCode = ProcessMessageBytesCode.REQUEUE; break; } } } return resultCode; }
千呼万唤始进去,总算看到工作生产的代码了:
consumer.consumeMessage(item);
这里的consumer就是咱们初始化延时工作队列时传入的工作消费者了。到这里curator延时工作的解决逻辑就全副讲完了。其余细节大家能够本人去看下源码,这里就不细讲了。
总结
这里简略回顾下curator实现延时工作的逻辑:首先在生产工作的时候,将所有工作都放到同一个节点上面,其中工作执行工夫放到子节点的名称中,工作形容放到子节点的data中。后盾会有一个线程去扫相应队列节点下的所有子节点,客户端拿到这些子节点后会将执行工夫和工作形容解析进去,再按工作执行工夫从早到晚排序,再顺次解决到期的工作,解决完再删除相应的子节点。这就是curator解决延时工作的大抵流程了。
后面说了,curator实现延时工作不是很优雅,具体不优雅在哪里呢?首先,curator对工作执行工夫的排序不是在zookeeper服务端实现的,而是在客户端进行,如果说有人一次性往zookeeper里放了100万个延时工作,那么curator也会全副拿到客户端进行排序,这在工作数多的时候必定是有问题的。再者,zookeeper的主要用途不是用于存储的,他不像MySQL或者Redis一样,被设计成存储系统,zookeeper更多地是作为分布式协调系统,存储不是他的强项,所以如果你要存储的延时工作很多,用zookeeper来做也是不适合的。
之所以花了这么大的篇幅来介绍curator如何利用zookeeper来实现延时工作,是为了通知大家,不是只有有轮子就能够间接拿来用的,如果不关怀轮子是怎么实现的,那有一天出了问题就无从下手了。
{
resultCode = ProcessMessageBytesCode.REQUEUE; break; } } } return resultCode;
}
千呼万唤始进去,总算看到工作生产的代码了:
consumer.consumeMessage(item);
这里的consumer就是咱们初始化延时工作队列时传入的工作消费者了。到这里curator延时工作的解决逻辑就全副讲完了。其余细节大家能够本人去看下源码,这里就不细讲了。
总结
这里简略回顾下curator实现延时工作的逻辑:首先在生产工作的时候,将所有工作都放到同一个节点上面,其中工作执行工夫放到子节点的名称中,工作形容放到子节点的data中。后盾会有一个线程去扫相应队列节点下的所有子节点,客户端拿到这些子节点后会将执行工夫和工作形容解析进去,再按工作执行工夫从早到晚排序,再顺次解决到期的工作,解决完再删除相应的子节点。这就是curator解决延时工作的大抵流程了。
后面说了,curator实现延时工作不是很优雅,具体不优雅在哪里呢?首先,curator对工作执行工夫的排序不是在zookeeper服务端实现的,而是在客户端进行,如果说有人一次性往zookeeper里放了100万个延时工作,那么curator也会全副拿到客户端进行排序,这在工作数多的时候必定是有问题的。再者,zookeeper的主要用途不是用于存储的,他不像MySQL或者Redis一样,被设计成存储系统,zookeeper更多地是作为分布式协调系统,存储不是他的强项,所以如果你要存储的延时工作很多,用zookeeper来做也是不适合的。
之所以花了这么大的篇幅来介绍curator如何利用zookeeper来实现延时工作,是为了通知大家,不是只有有轮子就能够间接拿来用的,如果不关怀轮子是怎么实现的,那有一天出了问题就无从下手了。
对于延时工作之zookeeper,你学废了么?