当初有一个需要,利用启动时须要初始化一些数据,为了保障高可用,会启动多正本(replicas >= 3),如何保证数据不会反复?

计划一:数据带上主键

最简略的办法,初始化数据都带上主键,这样主键抵触就会报错。然而这么做咱们须要对抵触的谬误进行额定解决,因为插入咱们个别会复用已写好的 DAO 层代码。

另外,初始化数据的主键可能是动静生成的,并不想把主键写死。所以上面来介绍此次的配角:基于 MySQL 的分布式锁的解决方案。

计划二:基于 MySQL 的分布式锁

多正本分布式应用,在这种 n 选 1 竞争某个资源或执行权的场景,个别都会用到分布式锁。分布式有很多种实现形式,如基于 redis,etcd,zookeeper,file 等零碎。实质上,就是找个多个节点都认可的中央保留数据,通过数据竞态来实现锁,当然这个依赖最好是高可用,否则会引发单点故障。

多个正本都应用同一个 MySQL,所以咱们能够很不便的基于 MySQL 实现一个分布式锁。原理很简略,利用惟一索引保障只有一个副本能插入某条数据,插入胜利则示意取锁胜利,执行结束则删除该条数据开释锁。

建一个表用来寄存锁数据,将 Action 设为惟一索引,示意对某个动作加锁,如:init 初始化,cronjob 定时工作等不同动作之间加锁互不影响。

type lock struct {    Id        string `gorm:"primary_key"`    CreatedAt time.Time    UpdatedAt time.Time    ExpiredAt time.Time // 锁过期工夫    Action    string `gorm:"unique;not null"`    Holder    string // 持锁人信息,能够应用 hostname}

既然有过期工夫,那么持锁工夫设为多长适合呢?设置太短可能逻辑还没执行完锁就过期了;设置太长如果程序中途挂了没有开释锁,那么这段时间所有节点都拿不到锁。

要解决这个问题咱们能够应用租约机制(lease),设置较短的持锁工夫,而后在持锁周期内,一直缩短持锁工夫,直到被动开释。这样即便程序解体没有 UnLock,锁也会因为没有刷新租约很快过期,不影响其余节点获取锁。

Lock 时启动一个 goroutine 刷新租约,Unlock 时通过 stopCh 将其进行。

另外,MySQL 中并没有线程去解决过期的记录,所以咱们在调用 Lock 时先尝试将过期记录删掉。

外围代码:

func NewLockDb(action, holder string, lease time.Duration) *lockDb {    return &lockDb{        db:       GetDB(context.Background()),        stopCh:   make(chan struct{}),        action:   action,        holder:   holder,        leaseAge: lease,    }}func (s *lockDb) Lock() (bool, error) {    err := s.cleanExpired()    if err != nil {        return false, errx.WithStackOnce(err)    }    err = s.db.Create(&lock{        ExpiredAt: time.Now().Add(s.leaseAge),        Action:    s.action,        Holder:    s.holder,    }).Error    if err != nil {        // Duplicate entry '<action_val>' for key 'action'        if strings.Contains(err.Error(), "Duplicate entry") {            return false, nil        }        return false, errx.WithStackOnce(err)    }    s.startLease()    log.Debugf("%s get lock", s.holder)    return true, nil}func (s *lockDb) UnLock() error {    s.stopLease()    var err error    defer func() {        err = s.db.            Where("action = ? and holder = ?", s.action, s.holder).            Delete(&lock{}).            Error    }()    return err}func (s *lockDb) cleanExpired() error {    err := s.db.        Where("expired_at < ?", time.Now()).        Delete(&lock{}).        Error    return err}func (s *lockDb) startLease() {    go func() {        // 残余 1/4 时刷新租约        ticker := time.NewTicker(s.leaseAge * 3 / 4)        for {            select {            case <-ticker.C:                err := s.refreshLease()                if err != nil {                    log.Errorf("refreash lease err: %s", err)                } else {                    log.Debug("lease refreshed")                }            case <-s.stopCh:                log.Debug("lease stopped")                return            }        }    }()}func (s *lockDb) stopLease() {    close(s.stopCh)}func (s *lockDb) refreshLease() error {    err := s.db.Model(&lock{}).        Where("action = ? and holder = ?", s.action, s.holder).        Update("expired_at", time.Now().Add(s.leaseAge)).        Error    return err}

应用及测试:

func TestLock(t *testing.T) {    i := 3    wg := &sync.WaitGroup{}    wg.Add(i)    for i > 0 {        holder := strconv.Itoa(i)        action := "test"        i--        go func() {            defer wg.Done()            locker := dbcore.NewLockDb(action, holder, 10*time.Second)            if _, err := locker.Lock(); err != nil {                t.Logf("not hold the lock, err: %+v", err)                return            }            time.Sleep(30 * time.Second)            locker.UnLock()        }()    }    wg.Wait()}

残缺代码:https://github.com/win5do/go-...

这个分布式锁实现在初始数据场景是够用了,但并不完满,例如:依赖工夫同步,不能容忍工夫偏斜;获取锁不是阻塞的,如果要抢锁须要应用方自旋; 锁不可重入,粒度是过程级别,同一个 Action,以后过程获取锁后,开释后能力再次获取锁。

大家能够思考一下如何欠缺。