基于Python实现环形队列高效定时器

定时器Python实现代码

import timeimport redisimport multiprocessingclass Base:    """    redis配置    """    redis_conf = {}    """    环形队列应用redis进行存储    """    _ri = None    """    定时器轮盘大小    """    slot_num = 15    """    存储环形队列应用的redis缓存key    """    cache_key = 'wheel:slot_'    def __init__(self, **kwargs):        for k in kwargs:            if hasattr(self, k):                setattr(self, k, kwargs[k])        self._ri = redis.Redis(**self.redis_conf)class Timer(Base):    """    以后slot的下标    """    _current = 0    """    事件处理    """    event_handler = None    def worker(self):        """        # TODO 测试每个卡槽有1W事件ID的解决效率        独立过程,散发事件id给事件处理器        :return:        """        key = self.cache_key + str(self._current)        # 获取以后卡槽中须要触发的事件ID        event_ids = self._ri.zrangebyscore(key, 0, 0)        # 删除以后卡槽中须要触发的事件ID        self._ri.zremrangebyscore(key, 0, 0)        # 把以后卡槽剩下的事件ID全副遍历进去,缩小一次残余循环次数        surplus_event_ids = self._ri.zrange(key, 0, -1)        for mid in surplus_event_ids:            self._ri.zincrby(key, mid, -1)        # 把事件ID转交给handler解决        for mid in event_ids:            self.event_handler(eid=mid)        exit(0)    def run(self):        """        启动过程        :return:        """        while True:            p = multiprocessing.Process(target=self.worker)            p.start()            time.sleep(1)            self._current = int(time.time()) % self.slot_numclass TimerEvent(Base):    def add(self, event_id, emit_time):        """        增加事件ID到定时器        :param event_id: 事件ID        :param emit_time: 触发工夫        :return:        """        current_time = int(time.time())        diff = emit_time - current_time        if diff > 0:            # 计算循环次数            cycle = int(diff / self.slot_num)            # 计算要存入的slot的索引            index = (diff % self.slot_num + current_time % self.slot_num) % self.slot_num            res = self._ri.zadd(self.cache_key + str(index), str(event_id), cycle)            return True if res else False        return False    # TODO 批量增加同一时间,不同事件ID    # TODO 批量增加不同工夫,不同事件ID

通过环形队列实现高效工作触发的设计说明

  1. redis汇合【slot】
  • 以redis多个有法则的键名的有序汇合组成环形数组
key_1key_2....key_n
  • 有序汇合

命令

ZADD key score member
有序汇合中蕴含两局部, 一个是score, 一个是memberscore作为残余循环次数  meber作为事件ID
  1. python多过程
  • 计算以后工夫应该解决的卡槽

    以后slot索引 = (以后工夫 % 卡槽总数 + 以后工夫戳 % 卡槽总数) % 卡槽总数

"%"为取余数操作
  • 创立独立子过程解决

    以后子过程须要疾速读取的残余循环次数为0事件ID

    删除以后slot已取出的事件ID

    开始把事件ID顺次转交给事件handler解决

利用阐明

  1. 启动定时器
import Timerimport timedef event_handler(eid):    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), eid)t = Timer(redis_conf={    'host': '127.0.0.1',    'port': 6379,    'password': '123456',    'db': 0}, event_handler=event_handler)times = int(time.time())print('Current Time is ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(times)))t.run()
  1. 增加须要延时触发事件ID
import TimerEventimport timete = TimerEvent(redis_conf={    'host': '127.0.0.1',    'port': 6379,    'password': '123456',    'db': 0})times = int(time.time())print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(times)))after_seconds_alert = 20for x in range(100):    te.add(x, times + after_seconds_alert + x)print('Firs Emit will happened at ' + time.strftime(    'Start:%Y-%m-%d %H:%M:%S',    time.localtime(times + after_seconds_alert)))
参考文章 10w定时工作,如何高效触发超时