关于算法:编程参考连载-Snowflake-算法原理与对应的-Python-实现

5次阅读

共计 3337 个字符,预计需要花费 9 分钟才能阅读完成。

Twitter 于 2010 年开源了外部团队在用的一款全局惟一 ID 生成算法 Snowflake,翻译过去叫做雪花算法。Snowflake 不借助数据库,可间接由编程语言生成,它间断生成的 3 个 ID 看起来像这样 563583455628754944、563583466173235200、563583552944996352。

Snowflake 以 64 bit 来存储组成 ID 的 4 个局部:

• 最高位占 1 bit,值固定为 0,以保障生成的 ID 为负数;• 中位占 41 bit,值为毫秒级工夫戳;• 中下位占 10 bit,值为工作机器的 ID,值的下限为 1024;• 末位占 12 bit,值为以后毫秒内生成的不同 ID,值的下限为 4096;

这样的设计容许在 1 毫秒内生成最多 4096 个 ID,同时因为中位是工夫戳,使得生成的 ID 是有序递增的。

Snowflake 的代码实现

代码实现中有一个重要的知识点——位运算。不理解位运算的敌人请浏览本书(《Python 编程参考》)的 位运算 章节开展学习。

首先咱们引入必要的库,并设定中下位和末位的长度:

import time
import logging
# 调配地位
WORKER_BITS = 5
DATACENTER_BITS = 5
SEQUENCE_BITS = 12

这里的中下位蕴含 5 位机房 ID 和 5 位机器 ID,它们一起组成 10 位的中下位。接着通过中下位的长度计算机房和机器的上限值;

# 设定设施数量下限
WORKER_UPPER_LIMIT = -1 ^ (-1 << WORKER_BITS)
DATACENTER_UPPER_LIMIT = -1 ^ (-1 << DATACENTER_BITS)

下面提到过这是一组 64 位长度的组合,在获取到每一段之后咱们须要将它们按位组合到一起,所以这里须要计算出地位的偏移量:

# 组合时的位运算偏移量
WORKER_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_BITS + DATACENTER_BITS

而后设定掩码和元工夫:

SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS) # 掩码
EPOCH = 1577808001000 # 元工夫戳 此处元设为 2020-01-01 00:00:01

依照 Snowflake 设定的 41 位长度,工夫戳最多也只能用 70 年,如果从个 1970 年开始,那么就白白浪费了几十年,所以这里的元工夫能够设定为代码编写的工夫或者近年工夫。

版权水印 微信公众号 Python 编程参考

所有准备就绪后,开始设计 Snowflake 类的构造,这个算法波及工夫边界、工夫戳生成、编号获取操作,代码根本构造如下:

class SnowFlake:
 def __init__(self, data_center_id, worker_id, sequence=0):
 pass
 def _timestamp():
 """指定位数的工夫戳"""
 pass
 def take(self):
 """获取一个编号"""
 pass
 def _generate(self):
 """生成一个编号"""
 def _wait_next_time(self, last_timestamp):
 """等到下一次单位工夫"""
 pass

一些初始化工作和根本边界查看工作在 init 函数中发展,例如查看 WORKER ID 和 DATACENTER ID 的边界,ID 的初始化等,对应代码如下:

 def __init__(self, data_center_id, worker_id, sequence=0):
 # 编号下限查看
 if worker_id > WORKER_UPPER_LIMIT:
 raise ValueError('WORKER ID 高于下限')
 if worker_id < 0:
 raise ValueError('WORKER ID 低于上限')
 if data_center_id > DATACENTER_UPPER_LIMIT:
 raise ValueError('DATA CENTER ID 高于下限')
 if data_center_id < 0:
 raise ValueError('DATA CENTER ID 低于上限')
 self.worker_id = worker_id
 self.datacenter_id = data_center_id
 self.sequence = sequence
 self.last_timestamp = -1 # 最近一次生成编号的工夫戳

工夫戳的生成很简略,然而咱们编写代码的时候要留神函数繁多职责准则,因而把它独自拿进去做一个函数:

 @staticmethod
 def _timestamp(n=1e3):
 """指定位数的工夫戳"""
 return int(time.time() * n)

如果单位工夫内生成的数超过下限,就须要等到下一个单位工夫,对应代码如下:

 def _wait_next_time(self, last_timestamp):
 """等到下一次单位工夫"""
 timestamp = self._timestamp()
 while timestamp <= last_timestamp:
 timestamp = self._timestamp()
 return timestamp

取号时波及的操作比拟多,例如校验工夫是否回拨、单位工夫生成的数是否超限、生成账号等,因而须要拆分称多个小函数,遵循繁多职责准则:

 def take(self) -> int:
 """获取一个编号"""
 timestamp = self._timestamp()
 self._check(timestamp)
 self.last_timestamp = timestamp # 更新最近一次生成编号的工夫戳
 return self._generate(timestamp)
 def _generate(self, timestamp):
 """生成一个编号"""
 number = ((timestamp - EPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | 
 (self.worker_id << WORKER_SHIFT) | self.sequence
 return number
 def _check(self, timestamp):
 """超限查看"""
 self._time_back_off_check(timestamp)
 self._number_check(timestamp)
 def _number_check(self, timestamp):
 """ 数超限查看
 查看以后工夫生成的编号是否超过下限 超过下限则等到下一个工夫生成
 """
 if timestamp == self.last_timestamp:
 self.sequence = (self.sequence + 1) & SEQUENCE_MASK
 if self.sequence == 0:
 timestamp = self._wait_next_time(self.last_timestamp)
 else:
 self.sequence = 0
 def _time_back_off_check(self, timestamp):
 """查看时钟回拨"""
 if timestamp < self.last_timestamp:
 logging.error('发现时钟回退,记录到最近一次的工夫戳为 {}'.format(self.last_timestamp))
 raise Exception("时钟回拨异样")

至此,Snowflake 算法编写实现。残缺代码下如图所示:

carbon.png

这里是微信公众号 Python 编程参考,如果你感觉这篇文章对你有帮忙,请来关注我哦。点击返回作者韦世东的技术专栏 www.weishidong.com,看更多有用常识。热门文章一览:

[1]《几个有效应对 35 岁危机的方法》
[2]《工程师绘图与设计实际指南》
[3]《继续交付实际 – GitHub Actions 部署 Node 利用到云服务器》
[4]《SSH 免明码 / 免用户名 / 免 IP 登录云服务器实际》
[5]《ElasticSearch 定时删除指定天数的数据实际》

正文完
 0