共计 5330 个字符,预计需要花费 14 分钟才能阅读完成。
摘要: 在单过程的零碎中,当存在多个线程能够同时扭转某个变量时,就须要对变量或代码块做同步,使其在批改这种变量时可能线性执行打消并发批改变量,而同步实质上通过锁来实现。
本文分享自华为云社区《还不会应用分布式锁?从零开始基于 etcd 实现分布式锁》,原文作者:aoho。
为什么须要分布式锁?
在单过程的零碎中,当存在多个线程能够同时扭转某个变量时,就须要对变量或代码块做同步,使其在批改这种变量时可能线性执行打消并发批改变量。而同步实质上通过锁来实现。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么须要在某个中央做个标记,这个标记必须每个线程都能看到,当标记不存在时能够设置该标记,其余后续线程发现曾经有标记了则期待领有标记的线程完结同步代码块勾销标记后再去尝试设置标记。
而在分布式环境下,数据一致性问题始终是难点。相比于单过程,分布式环境的状况更加简单。分布式与单机环境最大的不同在于其不是多线程而是多过程。多线程因为能够共享堆内存,因而能够简略的采取内存作为标记存储地位。而过程之间甚至可能都不在同一台物理机上,因而须要将标记存储在一个所有过程都能看到的中央。
常见的是秒杀场景,订单服务部署了多个服务实例。如秒杀商品有 4 个,第一个用户购买 3 个,第二个用户购买 2 个,现实状态下第一个用户能购买胜利,第二个用户提醒购买失败,反之亦可。而理论可能呈现的状况是,两个用户都失去库存为 4,第一个用户买到了 3 个,更新库存之前,第二个用户下了 2 个商品的订单,更新库存为 2,导致业务逻辑出错。
在下面的场景中,商品的库存是共享变量,面对高并发情景,须要保障对资源的拜访互斥。在单机环境中,比方 Java 语言中其实提供了很多并发解决相干的 API,然而这些 API 在分布式场景中就无能为力了,因为分布式系统具备多线程和多过程的特点,且散布在不同机器中,synchronized 和 lock 关键字将失去原有锁的成果,。仅依赖这些语言本身提供的 API 并不能实现分布式锁的性能,因而须要咱们想想其它办法实现分布式锁。
常见的锁计划如下:
- 基于数据库实现分布式锁
- 基于 Zookeeper 实现分布式锁
- 基于缓存实现分布式锁,如 redis、etcd 等
上面咱们简略介绍下这几种锁的实现,并重点介绍 etcd 实现锁的办法。
基于数据库的锁
基于数据库的锁实现也有两种形式,一是基于数据库表,另一种是基于数据库的排他锁。
基于数据库表的增删
基于数据库表增删是最简略的形式,首先创立一张锁的表次要蕴含下列字段:办法名,工夫戳等字段。
具体应用的办法为:当须要锁住某个办法时,往该表中插入一条相干的记录。须要留神的是,办法名有唯一性束缚。如果有多个申请同时提交到数据库的话,数据库会保障只有一个操作能够胜利,那么咱们就能够认为操作胜利的那个线程取得了该办法的锁,能够执行办法体内容。执行结束,须要删除该记录。
对于上述计划能够进行优化,如利用主从数据库,数据之间双向同步。一旦主库挂掉,将应用服务疾速切换到从库上。除此之外还能够记录以后取得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果以后机器的主机信息和线程信息在数据库能够查到的话,间接把锁调配给该线程,实现可重入锁。
基于数据库排他锁
咱们还能够通过数据库的排他锁来实现分布式锁。基于 Mysql 的 InnoDB 引擎,能够应用以下办法来实现加锁操作:
public void lock(){connection.setAutoCommit(false)
int count = 0;
while(count < 4){
try{
select * from lock where lock_name=xxx for update;
if(后果不为空){
// 代表获取到锁
return;
}
}catch(Exception e){ }
// 为空或者抛异样的话都示意没有获取到锁
sleep(1000);
count++;
}
throw new LockException();}
在查问语句前面减少 for update,数据库会在查问过程中给数据库表减少排他锁。当某条记录被加上排他锁之后,其余线程无奈再在该行记录上减少排他锁。其余没有获取到锁的就会阻塞在上述 select 语句上,可能的后果有 2 种,在超时之前获取到了锁,在超时之前仍未获取到锁。
取得排它锁的线程即可取得分布式锁,当获取到锁之后,能够执行业务逻辑,执行完业务之后开释锁。
基于数据库锁的总结
下面两种形式都是依赖数据库的一张表,一种是通过表中的记录的存在状况确定以后是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。长处是间接借助现有的关系型数据库,简略且容易了解;毛病是操作数据库须要肯定的开销,性能问题以及 SQL 执行超时的异样须要思考。
基于 Zookeeper
基于 Zookeeper 的长期节点和程序个性能够实现分布式锁。
申请对某个办法加锁时,在 Zookeeper 上与该办法对应的指定节点的目录下,生成一个惟一的长期有序节点。当须要获取锁时,只须要判断有序节点中该节点是否为序号最小的一个。业务逻辑执行实现开释锁,只需将这个长期节点删除即可。这种形式也能够防止因为服务宕机导致的锁无奈开释,而产生的死锁问题。
Netflix 开源了一套 Zookeeper 客户端框架 curator,你能够自行去看一下具体应用办法。Curator 提供的 InterProcessMutex 是分布式锁的一种实现。acquire 办法获取锁,release 办法开释锁。另外,锁开释、阻塞锁、可重入锁等问题都能够无效解决。
对于阻塞锁的实现,客户端能够通过在 Zookeeper 中创立程序节点,并且在节点上绑定监听器 Watch。一旦节点发生变化,Zookeeper 会告诉客户端,客户端能够查看本人创立的节点是不是以后所有节点中序号最小的,如果是就获取到锁,便能够执行业务逻辑。
Zookeeper 实现的分布式锁也存在一些缺点。在性能上可能不如基于缓存实现的分布式锁。因为每次在创立锁和开释锁的过程中,都要动态创建、销毁刹时节点来实现锁性能。
此外,Zookeeper 中创立和删除节点只能通过 Leader 节点来执行,而后将数据同步到集群中的其余节点。分布式环境中不免存在网络抖动,导致客户端和 Zookeeper 集群之间的 session 连贯中断,此时 Zookeeper 服务端认为客户端挂了,就会删除长期节点。其余客户端就能够获取到分布式锁了,导致了同时获取锁的不统一问题。
基于缓存实现分布式锁
绝对于基于数据库实现分布式锁的计划来说,基于缓存来实现在性能方面会体现的更好一点,存取速度快很多。而且很多缓存是能够集群部署的,能够解决单点问题。基于缓存的锁有好几种,如 memcached、redis、本文上面次要解说基于 etcd 实现分布式锁。
通过 etcd txn 实现分布式锁
通过 etcd 实现分布式锁,同样须要满足一致性、互斥性和可靠性等要求。etcd 中的事务 txn、lease 租约以及 watch 监听个性,可能使得基于 etcd 实现上述要求的分布式锁。
思路剖析
通过 etcd 的事务个性能够帮忙咱们实现一致性和互斥性。etcd 的事务个性,应用的 IF-Then-Else 语句,IF 语言判断 etcd 服务端是否存在指定的 key,即该 key 创立版本号 create_revision 是否为 0 来查看 key 是否已存在,因为该 key 已存在的话,它的 create_revision 版本号就不是 0。满足 IF 条件的状况下则应用 then 执行 put 操作,否则 else 语句返回抢锁失败的后果。当然,除了应用 key 是否创立胜利作为 IF 的判断根据,还能够创立前缀雷同的 key,比拟这些 key 的 revision 来判断分布式锁应该属于哪个申请。
客户端申请在获取到分布式锁之后,如果产生异样,须要及时将锁给开释掉。因而须要租约,当咱们申请分布式锁的时候须要指定租约工夫。超过 lease 租期工夫将会主动开释锁,保障了业务的可用性。是不是这样就够了呢?在执行业务逻辑时,如果客户端发动的是一个耗时的操作,操作未实现的请状况下,租约工夫过期,导致其余申请获取到分布式锁,造成不统一。这种状况下则须要续租,即刷新租约,使得客户端可能和 etcd 服务端放弃心跳。
具体实现
咱们基于如上剖析的思路,绘制出实现 etcd 分布式锁的流程图,如下所示:
基于 Go 语言实现的 etcd 分布式锁,测试代码如下所示:
func TestLock(t *testing.T) {
// 客户端配置
config = clientv3.Config{Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
// 建设连贯
if client, err = clientv3.New(config); err != nil {fmt.Println(err)
return
}
// 1. 上锁并创立租约
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {panic(err)
}
leaseId = leaseGrantResp.ID
// 2 主动续约
// 创立一个可勾销的租约,次要是为了退出的时候可能开释
ctx, cancelFunc = context.WithCancel(context.TODO())
// 3. 开释租约
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {panic(err)
}
// 续约应答
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {fmt.Println("租约曾经生效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到主动续租应答:", keepResp.ID)
}
}
}
END:
}()
// 1.3 在租约工夫内去抢锁(etcd 外面的锁就是一个 key)kv = clientv3.NewKV(client)
// 创立事务
txn = kv.Txn(context.TODO())
//if 不存在 key,then 设置它,else 抢锁失败
txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("lock"))
// 提交事务
if txnResp, err = txn.Commit(); err != nil {panic(err)
}
if !txnResp.Succeeded {fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 抢到锁后执行业务逻辑,没有抢到退出
fmt.Println("解决工作")
time.Sleep(5 * time.Second)
}
预期的执行后果如下所示:
=== RUN TestLock
解决工作
收到主动续租应答: 7587848943239472601
收到主动续租应答: 7587848943239472601
收到主动续租应答: 7587848943239472601
--- PASS: TestLock (5.10s)
PASS
总得来说,如上对于 etcd 分布式锁的实现过程分为四个步骤:
- 客户端初始化与建设连贯;
- 创立租约,主动续租;
- 创立事务,获取锁;
- 执行业务逻辑,最初开释锁。
创立租约的时候,须要创立一个可勾销的租约,次要是为了退出的时候可能开释。开释锁对应的步骤,在下面的 defer 语句中。当 defer 租约关掉的时候,分布式锁对应的 key 就会被开释掉了。
小结
本文次要介绍了基于 etcd 实现分布式锁的案例。首先介绍了分布式锁产生的背景以及必要性,分布式架构不同于单体架构,波及到多服务之间多个实例的调用,跨过程的状况下应用编程语言自带的并发原语没有方法实现数据的一致性,因而分布式锁呈现,用来解决分布式环境中的资源互斥操作。接着介绍了基于数据库实现分布式锁的两种形式:数据表增删和数据库的排它锁。基于 Zookeeper 的长期节点和程序个性也能够实现分布式锁,这两种形式或多或少存在性能和稳定性方面的缺点。
接着本文重点介绍了基于 etcd 实现分布式锁的计划,依据 etcd 的特点,利用事务 txn、lease 租约以及 watch 监测实现分布式锁。
在咱们下面的案例中,抢锁失败,客户端就间接返回了。那么当该锁被开释之后,或者持有锁的客户端呈现了故障退出了,其余锁如何疾速获取锁呢?所以上述代码能够基于 watch 监测个性进行改良,各位同学能够自行试试。
点击关注,第一工夫理解华为云陈腐技术~