共计 4451 个字符,预计需要花费 12 分钟才能阅读完成。
1. 雪花算法
图片来自 https://zhuanlan.zhihu.com/p/…
- 生成的序列号是由 64 位示意
- 最高位为 0,示意是负数
- 第 2 到第 42 位示意工夫距离,其计算是应用以后工夫减去一个起始工夫失去一个工夫距离,41 位大概能够保留 69 年的工夫范畴,也就是说起始工夫是 2021 年的话,大概在 2090 年用完,足够应用了
- 第 43 位到第 52 位为具体服务的 id,服务的 id 是应用 etcd 来实现全局惟一的也能够应用 redis 进行实现,最多 1024 个,也就是说所有服务最多 1024 个,包含起来多个的服务
- 第 53 位到第 64 位为同一时间下递增的序列号
2. 源码如下,钻研能够看正文
实现只放在一个源文件外面了,具体的能够进行放在不同的包中进行调用,雪花算法的实现能够独自放在一个 worker 包中供所有服务调用
package main
import (
"context"
"errors"
"go.etcd.io/etcd/clientv3"
"log"
"strconv"
"sync"
"time"
)
const(
WORKERIDBits = 10 //wokerId 占 10 位 bit
SEQUENCEBITS = 12 // 序列号占的 bit 位数
MAXSEQUENCE = int64(-1) ^ (int64(-1) << SEQUENCEBITS) // 序列号的最大值
MAXWORKERID = int64(-1) ^ (int64(-1) << WORKERIDBits) //workerId 的最大值
TIMESTAMP_OFFSET = uint(22) // 工夫戳的偏移位数
WORKERID_OFFSET = uint(12) //workerId 的偏移位数
TIME_START_STAMP = int64(1589923200000) // 起始工夫 2020-05-20 08:00:00 +0800 CST
)
//-----------------------------workerId---------------------------
var CurrentWorkNodeNum string // 以后节点 number 节点 number 从 1 开始 最大值为 1024
var WokerNodePrefix = "worker" // 节点 key 前缀 节点值为 CurrentWorkNodeNum 例如 worker1 = 1 worker2 = 2
var wg sync.WaitGroup
func WorkerId() error {if len(CurrentWorkNodeNum) != 0 { //CurrentWorkNodeNum 如果曾经初始化过了,间接返回
return errors.New("CurrentWorkNodeNum inited")
}
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"http://192.168.56.111:2379"}, //etcd 端 能够抽离进去
DialTimeout: 2 * time.Second,
})
if err != nil {log.Println("create client err:",err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp,err := client.Get(ctx, WokerNodePrefix,clientv3.WithPrefix()) // 获取所有前缀为 worker 的节点
cancel()
if err != nil {return errors.New("get prefix worker node error")
}
existNodeMap := make(map[int]int) // 定义一个 map,保留曾经存在的节点
for _,ev := range resp.Kvs {num, _ := strconv.Atoi(string(ev.Value))
existNodeMap[num] = num //put 到 existNodeMap 中
log.Printf("%s:%s \n",ev.Key,ev.Value)
}
count := 1 // 从 1 到 1024 找最小的 number
for ;count < 1025; count++ {if _, ok := existNodeMap[count];!ok { // 如果不存在,就会间接 break
CurrentWorkNodeNum = strconv.Itoa(count)
break
}
}
if count == 1024 { // 代表 1024 个节点都曾经用完了,或者局部节点曾经挂掉了,而后 key 的租期还没有完结,能够重新启动
return errors.New("服务节点数目大于 1024")
}
go ActiveCurrentWorkerNode(client) // 启动一个协程始终激活以后 key, 如果以后服务挂了,key 就会在租期完结后查问不到了
return nil
}
func ActiveCurrentWorkerNode(client *clientv3.Client){
for {leasetime := int64(60) // 租期工夫
sleeptime := 50 // 以后协程睡眠工夫,小于租期工夫即可
lease := clientv3.NewLease(client)
log.Println("active currerntNode :",CurrentWorkNodeNum)
if leaseRes,err := lease.Grant(context.TODO(),leasetime);err != nil {panic(err)
}else {_, err := client.Put(context.Background(), WokerNodePrefix+CurrentWorkNodeNum, CurrentWorkNodeNum,clientv3.WithLease(leaseRes.ID))
if err != nil {panic(err)
}
}
time.Sleep(time.Second * time.Duration(sleeptime))
}
}
//-----------------------------workerId---------------------------
type SnowFlakeWorker struct{
mu sync.Mutex // 互斥锁
LastTimestamp int64 // 上一次的工夫距离
WorkerID int64 // 该服务的 wokerID
Sequence int64 // 同一时间戳下的序列号
}
func New(wokerID int64) *SnowFlakeWorker{
return &SnowFlakeWorker{
WorkerID: wokerID,
LastTimestamp: 0,
Sequence: 0,
}
}
func (s *SnowFlakeWorker) getMilliSeconds() int64{return time.Now().UnixNano() / 1e6 // 以后工夫的毫秒数}
func (s *SnowFlakeWorker) NextID() (uint64,error){s.mu.Lock() // 加锁
defer s.mu.Unlock()
timeStamp := s.getMilliSeconds() // 以后工夫毫秒数
if timeStamp < s.LastTimestamp { // 以后工夫毫秒数小于上一次的毫秒数,谬误间接抛出异样
return 0,errors.New("currentTime is before timestamp")
}
if timeStamp == s.LastTimestamp { // 如果相等则 sequenc 加 1
s.Sequence = (s.Sequence + 1) & MAXSEQUENCE
if s.Sequence == 0 { // 加 1 取余 MAXSEQUENCE 阐明以后毫秒数的序列号应用结束,须要期待下一个毫秒数
for timeStamp <= s.LastTimestamp { // 期待到下一个毫秒数就退出
timeStamp = s.getMilliSeconds()}
}
}else {s.Sequence = 0 // 如果大于 LastTimestamp 则 sequence 为 0}
s.LastTimestamp = timeStamp
return uint64((timeStamp - TIME_START_STAMP) << TIMESTAMP_OFFSET | s.WorkerID << WORKERIDBits |s.Sequence),nil
}
func main() {wg.Add(1)
err := WorkerId()
if err != nil {log.Println("worker inited error:",err)
return
}
currentWorkerNodeNum, _ := strconv.Atoi(CurrentWorkNodeNum)
worker := New(int64(currentWorkerNodeNum))
for i := 1;i<10;i++ {id, err := worker.NextID()
if err != nil {log.Println("generate snowflake id,error:",err)
return
}
log.Println("snowflakeId:",id)
}
wg.Wait()}
3. 测试后果
2021-08-03 17:38:16.416241 I | worker1:1
2021-08-03 17:38:16.416241 I | worker2:2
2021-08-03 17:38:16.416241 I | snowflakeId: 159636453498817536
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453498817537
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011840
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011841
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011842
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011843
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011844
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011845
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011846
2021-08-03 17:38:16.417196 I | active currerntNode : 3
正文完