关于golang:golang-结合etcd利用雪花算法实现全局递增唯一ID

1.雪花算法


图片来自 https://zhuanlan.zhihu.com/p/…

  • 生成的序列号是由64位示意
  • 最高位为0,示意是负数
  • 第2到第42位示意工夫距离,其计算是应用以后工夫减去一个起始工夫失去一个工夫距离,41位大概能够保留69年的工夫范畴,也就是说起始工夫是2021年的话,大概在2090年用完,足够应用了
  • 第43位到第52位为具体服务的id,服务的id是应用etcd来实现全局惟一的也能够应用redis进行实现,最多1024个,也就是说所有服务最多1024个,包含起来多个的服务
  • 第53位到第64位为同一时间下递增的序列号

2.源码如下,钻研能够看正文

实现只放在一个源文件外面了,具体的能够进行放在不同的包中进行调用,雪花算法的实现能够独自放在一个worker包中供所有服务调用

package main

import (
    "context"
    "errors"
    "go.etcd.io/etcd/clientv3"
    "log"
    "strconv"
    "sync"
    "time"
)

const(
    WORKERIDBits = 10 //wokerId 占10位bit
    SEQUENCEBITS = 12 //序列号占的bit位数
    MAXSEQUENCE = int64(-1) ^ (int64(-1) << SEQUENCEBITS) //序列号的最大值
    MAXWORKERID = int64(-1) ^ (int64(-1) << WORKERIDBits) //workerId的最大值

    TIMESTAMP_OFFSET = uint(22) //工夫戳的偏移位数
    WORKERID_OFFSET = uint(12) //workerId的偏移位数
    TIME_START_STAMP = int64(1589923200000) // 起始工夫 2020-05-20 08:00:00 +0800 CST
)

//-----------------------------workerId---------------------------
var CurrentWorkNodeNum string     //以后节点number 节点number从1开始 最大值为1024
var WokerNodePrefix = "worker" //节点key前缀 节点值为CurrentWorkNodeNum   例如 worker1 = 1 worker2 = 2
var wg sync.WaitGroup

func WorkerId() error {
    if len(CurrentWorkNodeNum) != 0 {                        //CurrentWorkNodeNum 如果曾经初始化过了,间接返回
        return errors.New("CurrentWorkNodeNum inited")
    }
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"http://192.168.56.111:2379"},      //etcd端 能够抽离进去
        DialTimeout: 2 * time.Second,
    })
    if err != nil {
        log.Println("create client err:",err)
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    resp,err := client.Get(ctx, WokerNodePrefix,clientv3.WithPrefix())      //获取所有前缀为worker的节点
    cancel()
    if err != nil {
        return errors.New("get prefix worker node error")
    }
    existNodeMap := make(map[int]int)       //定义一个map,保留曾经存在的节点
    for _,ev := range resp.Kvs {
        num, _ := strconv.Atoi(string(ev.Value))
        existNodeMap[num] = num             //put到existNodeMap中
        log.Printf("%s:%s \n",ev.Key,ev.Value)
    }
    count := 1                                //从1到1024找最小的number
    for  ;count < 1025; count++ {
        if _, ok := existNodeMap[count];!ok {   //如果不存在,就会间接break
            CurrentWorkNodeNum = strconv.Itoa(count)
            break
        }
    }
    if count == 1024 {                 //代表1024个节点都曾经用完了,或者局部节点曾经挂掉了,而后key的租期还没有完结,能够重新启动
        return errors.New("服务节点数目大于1024")
    }
    go ActiveCurrentWorkerNode(client) //启动一个协程始终激活以后key,如果以后服务挂了,key就会在租期完结后查问不到了
    return nil
}

func ActiveCurrentWorkerNode(client *clientv3.Client){
    for {
        leasetime := int64(60)    //租期工夫
        sleeptime := 50    //以后协程睡眠工夫,小于租期工夫即可
        lease := clientv3.NewLease(client)
        log.Println("active currerntNode :",CurrentWorkNodeNum)
        if leaseRes,err := lease.Grant(context.TODO(),leasetime);err != nil {
            panic(err)
        }else {
            _, err := client.Put(context.Background(), WokerNodePrefix+CurrentWorkNodeNum, CurrentWorkNodeNum,clientv3.WithLease(leaseRes.ID))
            if err != nil {
                panic(err)
            }
        }
        time.Sleep(time.Second * time.Duration(sleeptime))
    }
}
//-----------------------------workerId---------------------------

type SnowFlakeWorker struct{
    mu sync.Mutex   //互斥锁
    LastTimestamp int64 //上一次的工夫距离
    WorkerID int64   //该服务的wokerID
    Sequence int64 //同一时间戳下的序列号
}

func New(wokerID int64) *SnowFlakeWorker{
    return &SnowFlakeWorker{
        WorkerID: wokerID,
        LastTimestamp: 0,
        Sequence: 0,
    }
}

func (s *SnowFlakeWorker) getMilliSeconds() int64{
    return time.Now().UnixNano() / 1e6      //以后工夫的毫秒数
}

func (s *SnowFlakeWorker) NextID() (uint64,error){
    s.mu.Lock()     //加锁
    defer s.mu.Unlock()
    timeStamp := s.getMilliSeconds()   //以后工夫毫秒数
    if timeStamp < s.LastTimestamp {   //以后工夫毫秒数小于上一次的毫秒数,谬误间接抛出异样
         return 0,errors.New("currentTime is before timestamp")
    }
    if timeStamp == s.LastTimestamp {     //如果相等则sequenc加1
         s.Sequence = (s.Sequence + 1) & MAXSEQUENCE
        if s.Sequence == 0 {     //加1取余MAXSEQUENCE 阐明以后毫秒数的序列号应用结束,须要期待下一个毫秒数
            for timeStamp <= s.LastTimestamp {   //期待到下一个毫秒数就退出
                 timeStamp = s.getMilliSeconds()
            }
        }
    }else {
        s.Sequence = 0 //如果大于LastTimestamp 则sequence为0
    }
    s.LastTimestamp = timeStamp
    return uint64((timeStamp - TIME_START_STAMP) << TIMESTAMP_OFFSET | s.WorkerID << WORKERIDBits |s.Sequence),nil
}

func main() {
    wg.Add(1)
    err := WorkerId()
    if err != nil {
        log.Println("worker inited error:",err)
        return
    }
    currentWorkerNodeNum, _ := strconv.Atoi(CurrentWorkNodeNum)
    worker := New(int64(currentWorkerNodeNum))
    for i := 1;i<10;i++ {
        id, err := worker.NextID()
        if err != nil {
            log.Println("generate snowflake id,error:",err)
            return
        }
        log.Println("snowflakeId:",id)
    }
    wg.Wait()
}

3.测试后果

2021-08-03 17:38:16.416241 I | worker1:1
2021-08-03 17:38:16.416241 I | worker2:2
2021-08-03 17:38:16.416241 I | snowflakeId: 159636453498817536
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453498817537
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011840
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011841
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011842
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011843
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011844
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011845
2021-08-03 17:38:16.417196 I | snowflakeId: 159636453503011846
2021-08-03 17:38:16.417196 I | active currerntNode : 3

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理