基于hashicorp/raft的分布式一致性实战教学

7次阅读

共计 11561 个字符,预计需要花费 29 分钟才能阅读完成。

本文由云 + 社区发表作者:_Super_
导语:hashicorp/raft 是 raft 算法的一种比较流行的 golang 实现,基于它能够比较方便的构建具有强一致性的分布式系统。本文通过实现一个简单的分布式缓存系统来介绍使用 hashicorp/raft 来构建分布式应用程序的方法。

1. 背景
​ 对于后台开发来说,随着业务的发展,由于访问量增大的压力和数据容灾的需要,一定会需要使用分布式的系统,而分布式势必会引入一致性的问题。
​ 一般把一致性分为三种类型:弱一致性、最终一致性、强一致性。这三种模型的一致性强度逐渐递增,实现代价也越来越大。通常弱一致性和最终一致性可以异步冗余,强一致性则是同步冗余,而同步也就意味着影响性能。
​ 对常见的互联网业务来说,使用弱一致性或者最终一致性即可。而使用强一致性一方面会影响系统的性能,另一方面实现也比较困难。常见的一致性协议如 zab、raft、paxos,如果由业务纯自己来实现的话代价较大,而且很可能会因为考虑不周而引入其他问题。
​ 对于一些需要强一致性,而又希望花费较小代价的业务来说,使用开源的一致性协议实现组件会是个不错的选择。hashicorp/raft 是 raft 协议的一种 golang 实现,由 hashicorp 公司实现并开源,已经在 consul 等软件中使用。它封装了 raft 协议的 leader 选举、log 同步等底层实现,基于它能够相对比较容易的构建强一致性的分布式系统,下面以实现一个简单的分布式缓存服务 (取名叫 stcache) 来演示 hashicorp/raft 的具体使用,完整代码可以在 github 上下载。
2. raft 简介
​ 首先还是简单介绍下 raft 协议。这里不详细介绍 raft 协议,只是为了方便理解后面的 hashicorp/raft 的使用步骤而简单列举出 raft 的一点原理。具体的 raft 协议可以参考 raft 的官网,如果已经了解 raft 协议可以直接跳过这一节。
​ raft 是一种相对易于理解的一致性的协议。它属于 leader-follower 型的协议,有且只有一个 leader,所有的事务请求都由 leader 处理,leader 征求 follower 的意见,在集群内部达成一致,决定是否执行事务。当 leader 出现故障,集群中的 follower 会通过投票的方式选出一个新的 leader,维持集群运行。
​ raft 的理论基础是 Replicated State Machine,Replicated State Machine 需要满足如下的条件:一个 server 可以有多个 state,多个 server 从同一个 start 状态出发,都执行相同的 command 序列,最终到达的 stare 是一样的。如上图,一般使用 replicated log 来记录 command 序列,client 的请求被 leader 转化成 log entry,然后通过一致性模块把 log 同步到各个 server,让各个 server 的 log 一致。每个 server 都有 state Machine,从 start 出发,执行完这些 log 中的 command 后,server 处于相同的 state。所以 raft 协议的关键就是保证各个 server 的 log 一致,然后每个 server 通过执行相同的 log 来达到一致的状态,理解这点有助于掌握后面对 hashicorp/raft 的具体使用。

3. hashicorp/raft 使用
3.1 单机版
​ 首先我们创建一个单机版本的 stcache,它是一个简单的缓存服务器,在服务内部用一个 map 来保存数据,只提供简单的 get 和 set 操作。
type cacheManager struct {
data map[string]string
sync.RWMutex
}
​ 然后 stcache 开启一个 http 服务,提供两个 api,第一个是 set 接口,用于设置数据到缓存,成功时返回 ok,失败返回错误信息:

​ 第二个是 get 接口,根据 key 查询具体的 value:

​ 下面我们在单机版 stcache 的基础上逐步扩充,让它成为一个具有强一致性的分布式系统。
3.2 创建节点
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node.
func NewRaft(conf *Config,
fsm FSM,
logs LogStore,
stable StableStore,
snaps SnapshotStore,
trans Transport) (*Raft, error) {
​ hashicorp/raft 库提供 NewRaft 方法来创建一个 raft 节点,这也是使用这个库的最重要的一个 api。NewRaft 需要调用层提供 6 个参数,分别是:

Config:节点配置
FSM:finite state machine,有限状态机
LogStore:用来存储 raft 的日志
StableStore:稳定存储,用来存储 raft 集群的节点信息等
SnapshotStore: 快照存储,用来存储节点的快照信息
Transport:raft 节点内部的通信通道

​ 下面从这些参数入手看应用程序需要做哪些工作。
3.3 Config
​ config 是节点的配置信息,我们直接使用 raft 默认的配置,然后用监听的地址来作为节点的 id。config 里面还有一些可配置的项,后面我们用到的时候再说。
raftConfig := raft.DefaultConfig()
raftConfig.LocalID = raft.ServerID(opts.raftTCPAddress)
raftConfig.Logger = log.New(os.Stderr, “raft: “, log.Ldate|log.Ltime)
3.4 LogStore 和 StableStore
​ LogStore、StableStore 分别用来存储 raft log、节点状态信息,hashicorp 提供了一个 raft-boltdb 来实现底层存储,它是一个嵌入式的数据库,能够持久化存储数据,我们直接用它来实现 LogStore 和 StableStore.
logStore, err := raftboltdb.NewBoltStore(filepath.Join(opts.dataDir,
“raft-log.bolt”))
stableStore, err := raftboltdb.NewBoltStore(filepath.Join(opts.dataDir,
“raft-stable.bolt”))
3.5 SnapshotStore
​ SnapshotStore 用来存储快照信息,对于 stcache 来说,就是存储当前的所有的 kv 数据,hashicorp 内部提供 3 中快照存储方式,分别是:

DiscardSnapshotStore:不存储,忽略快照,相当于 /dev/null,一般用于测试
FileSnapshotStore:文件持久化存储
InmemSnapshotStore:内存存储,不持久化,重启程序会丢失

​ 这里我们使用文件持久化存储。snapshotStore 只是提供了一个快照存储的介质,还需要应用程序提供快照生成的方式,后面我们再具体说。
snapshotStore, err := raft.NewFileSnapshotStore(opts.dataDir, 1, os.Stderr)
3.6 Transport
​ Transport 是 raft 集群内部节点之间的通信渠道,节点之间需要通过这个通道来进行日志同步、leader 选举等。hashicorp/raft 内部提供了两种方式来实现,一种是通过 TCPTransport,基于 tcp,可以跨机器跨网络通信;另一种是 InmemTransport,不走网络,在内存里面通过 channel 来通信。显然一般情况下都使用 TCPTransport 即可,在 stcache 里也采用 tcp 的方式。
func newRaftTransport(opts *options) (*raft.NetworkTransport, error) {
address, err := net.ResolveTCPAddr(“tcp”, opts.raftTCPAddress)
if err != nil {
return nil, err
}
transport, err := raft.NewTCPTransport(address.String(), address, 3, 10*time.Second, os.Stderr)
if err != nil {
return nil, err
}
return transport, nil
}
3.7 FSM
​ 最后再看 FSM,它是一个 interface,需要应用程序来实现 3 个 funcition。
/*FSM provides an interface that can be implemented by
clients to make use of the replicated log.*/
type FSM interface {
/* Apply log is invoked once a log entry is committed.
It returns a value which will be made available in the
ApplyFuture returned by Raft.Apply method if that
method was called on the same Raft node as the FSM.*/
Apply(*Log) interface{}
// Snapshot is used to support log compaction. This call should
// return an FSMSnapshot which can be used to save a point-in-time
// snapshot of the FSM. Apply and Snapshot are not called in multiple
// threads, but Apply will be called concurrently with Persist. This means
// the FSM should be implemented in a fashion that allows for concurrent
// updates while a snapshot is happening.
Snapshot() (FSMSnapshot, error)
// Restore is used to restore an FSM from a snapshot. It is not called
// concurrently with any other command. The FSM must discard all previous
// state.
Restore(io.ReadCloser) error
}
​ 第一个是 Apply,当 raft 内部 commit 了一个 log entry 后,会记录在上面说过的 logStore 里面,被 commit 的 log entry 需要被执行,就 stcache 来说,执行 log entry 就是把数据写入缓存,即执行 set 操作。我们改造 doSet 方法,这里不再直接写缓存,而是调用 raft 的 Apply 方式,为这次 set 操作生成一个 log entry,这里面会根据 raft 的内部协议,在各个节点之间进行通信协作,确保最后这条 log 会在整个集群的节点里面提交或者失败。
// doSet saves data to cache, only raft master node provides this api
func (h *httpServer) doSet(w http.ResponseWriter, r *http.Request) {
// … get params from request url

event := logEntryData{Key: key, Value: value}
eventBytes, err := json.Marshal(event)
if err != nil {
h.log.Printf(“json.Marshal failed, err:%v”, err)
fmt.Fprint(w, “internal error\n”)
return
}

applyFuture := h.ctx.st.raft.raft.Apply(eventBytes, 5*time.Second)
if err := applyFuture.Error(); err != nil {
h.log.Printf(“raft.Apply failed:%v”, err)
fmt.Fprint(w, “internal error\n”)
return
}

fmt.Fprintf(w, “ok\n”)
}
​ 对 follower 节点来说,leader 会通知它来 commit log entry,被 commit 的 log entry 需要调用应用层提供的 Apply 方法来执行日志,这里就是从 logEntry 拿到具体的数据,然后写入缓存里面即可。
// Apply applies a Raft log entry to the key-value store.
func (f *FSM) Apply(logEntry *raft.Log) interface{} {
e := logEntryData{}
if err := json.Unmarshal(logEntry.Data, &e); err != nil {
panic(“Failed unmarshaling Raft log entry.”)
}
ret := f.ctx.st.cm.Set(e.Key, e.Value)
return ret
}
3.7.1 snapshot
​ FSM 需要提供的另外两个方法是 Snapshot()和 Restore(),分别用于生成一个快照结构和根据快照恢复数据。首先我们需要定义快照,hashicorp/raft 内部定义了快照的 interface,需要实现两个 func,Persist 用来生成快照数据,一般只需要实现它即可;Release 则是快照处理完成后的回调,不需要的话可以实现为空函数。
// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
type FSMSnapshot interface {
// Persist should dump all necessary state to the WriteCloser ‘sink’,
// and call sink.Close() when finished or call sink.Cancel() on error.
Persist(sink SnapshotSink) error
// Release is invoked when we are finished with the snapshot.
Release()
}
​ 我们定义一个简单的 snapshot 结构,在 Persist 里面,自己把缓存里面的数据用 json 格式化的方式来生成快照,sink.Write 就是把快照写入 snapStore,我们刚才定义的是 FileSnapshotStore,所以会把数据写入文件。
type snapshot struct {
cm *cacheManager
}
// Persist saves the FSM snapshot out to the given sink.
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
snapshotBytes, err := s.cm.Marshal()
if err != nil {
sink.Cancel()
return err
}
if _, err := sink.Write(snapshotBytes); err != nil {
sink.Cancel()
return err
}
if err := sink.Close(); err != nil {
sink.Cancel()
return err
}
return nil
}
func (f *snapshot) Release() {}
3.7.2 snapshot 保存与恢复
​ 而快照生成和保存的触发条件除了应用程序主动触发外,还可以在 Config 里面设置 SnapshotInterval 和 SnapshotThreshold,前者指每间隔多久生成一次快照,后者指每 commit 多少 log entry 后生成一次快照。需要两个条件同时满足才会生成和保存一次快照,默认 config 里面配置的条件比较高,我们可以自己修改配置,比如在 stcache 里面配置 SnapshotInterval 为 20s,SnapshotThreshold 为 2,表示当满足距离上次快照保存超过 20s,且 log 增加 2 条的时候,保存一个新的快照。
raftConfig := raft.DefaultConfig()
raftConfig.LocalID = raft.ServerID(opts.raftTCPAddress)
raftConfig.Logger = log.New(os.Stderr, “raft: “, log.Ldate|log.Ltime)
raftConfig.SnapshotInterval = 20 * time.Second
raftConfig.SnapshotThreshold = 2
​ 服务重启的时候,会先读取本地的快照来恢复数据,在 FSM 里面定义的 Restore 函数会被调用,这里我们就简单的对数据解析 json 反序列化然后写入内存即可。至此,我们已经能够正常的保存快照,也能在重启的时候从文件恢复快照数据。
// Restore stores the key-value store to a previous state.
func (f *FSM) Restore(serialized io.ReadCloser) error {
return f.ctx.st.cm.UnMarshal(serialized)
}

// UnMarshal deserializes cache data
func (c *cacheManager) UnMarshal(serialized io.ReadCloser) error {
var newData map[string]string
if err := json.NewDecoder(serialized).Decode(&newData); err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.data = newData
return nil
}
3.8 集群建立
​ 集群最开始的时候只有一个节点,我们让第一个节点通过 bootstrap 的方式启动,它启动后成为 leader。
if opts.bootstrap {
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: raftConfig.LocalID,
Address: transport.LocalAddr(),
},
},
}
raftNode.BootstrapCluster(configuration)
}
​ 后续的节点启动的时候需要加入集群,启动的时候指定第一个节点的地址,并发送请求加入集群,这里我们定义成直接通过 http 请求。
// joinRaftCluster joins a node to raft cluster
func joinRaftCluster(opts *options) error {
url := fmt.Sprintf(“http://%s/join?peerAddress=%s”,
opts.joinAddress,
opts.raftTCPAddress)
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if string(body) != “ok” {
return errors.New(fmt.Sprintf(“Error joining cluster: %s”, body))
}
return nil
}
​ 先启动的节点收到请求后,获取对方的地址(指 raft 集群内部通信的 tcp 地址),然后调用 AddVoter 把这个节点加入到集群即可。申请加入的节点会进入 follower 状态,这以后集群节点之间就可以正常通信,leader 也会把数据同步给 follower。
// doJoin handles joining cluster request
func (h *httpServer) doJoin(w http.ResponseWriter, r *http.Request) {
vars := r.URL.Query()

peerAddress := vars.Get(“peerAddress”)
if peerAddress == “” {
h.log.Println(“invalid PeerAddress”)
fmt.Fprint(w, “invalid peerAddress\n”)
return
}
addPeerFuture := h.ctx.st.raft.raft.AddVoter(raft.ServerID(peerAddress),
raft.ServerAddress(peerAddress),
0, 0)
if err := addPeerFuture.Error(); err != nil {
h.log.Printf(“Error joining peer to raft, peeraddress:%s, err:%v, code:%d”, peerAddress, err, http.StatusInternalServerError)
fmt.Fprint(w, “internal error\n”)
return
}
fmt.Fprint(w, “ok”)
}
3.9 故障切换
​ 当集群的 leader 故障后,集群的其他节点能够感知到,并申请成为 leader,在各个 follower 中进行投票,最后选取出一个新的 leader。leader 选举是属于 raft 协议的内容,不需要应用程序操心,但是对有些场景而言,应用程序需要感知 leader 状态,比如对 stcache 而言,理论上只有 leader 才能处理 set 请求来写数据,follower 应该只能处理 get 请求查询数据。为了模拟说明这个情况,我们在 stcache 里面我们设置一个写标志位,当本节点是 leader 的时候标识位置 true,可以处理 set 请求,否则标识位为 false,不能处理 set 请求。
// doSet saves data to cache, only raft master node provides this api
func (h *httpServer) doSet(w http.ResponseWriter, r *http.Request) {
if !h.checkWritePermission() {
fmt.Fprint(w, “write method not allowed\n”)
return
}
// … set data
}
​ 当故障切换的时候,follower 变成了 leader,应用程序如何感知到呢?在 raft 结构里面提供有一个 eaderCh,它是 bool 类型的 channel,不带缓存,当本节点的 leader 状态有变化的时候,会往这个 channel 里面写数据,但是由于不带缓冲且写数据的协程不会阻塞在这里,有可能会写入失败,没有及时变更状态,所以使用 leaderCh 的可靠性不能保证。好在 raft Config 里面提供了另一个 channel NotifyCh,它是带缓存的,当 leader 状态变化时会往这个 chan 写数据,写入的变更消息能够缓存在 channel 里面,应用程序能够通过它获取到最新的状态变化。
​ 我们首先在初始化 config 时候创建一个带缓存的 chan,把它赋值给 config 里面的 NotifyCh,然后在节点启动后监听这个 chan,当本节点的 leader 状态变化时(变成 leader 或者从 leader 变成 follower),就能够从这个 chan 里面读取到 bool 值,并调整我们先前设置的写标志位,控制是否能否处理 set 操作。
func newRaftNode(opts *options, ctx *stCachedContext) (*raftNodeInfo, error) {
raftConfig := raft.DefaultConfig()
raftConfig.LocalID = raft.ServerID(opts.raftTCPAddress)
raftConfig.Logger = log.New(os.Stderr, “raft: “, log.Ldate|log.Ltime)
raftConfig.SnapshotInterval = 20 * time.Second
raftConfig.SnapshotThreshold = 2
leaderNotifyCh := make(chan bool, 1)
raftConfig.NotifyCh = leaderNotifyCh
// …
}
4. 成果演示
​ 做完上面的工作后,我们来测试下效果,我们同一台机器上启动 3 个节点来构成一个集群,第一个节点用 bootstrapt 的方式启动,成为 leader

​ 第二个节点和第三个节点启动时指定加入集群,成为 follower

​ 现在集群中有 3 个节点,leader 监听 127.0.01:6000 对外提供 set 和 get 接口,两个 follower 分别监听 127.0.0.1:6001 和 127.0.0.1:6002,对外提供 get 接口。
4.1 集群数据同步
​ 通过调用 leader 的 set 接口写入一个数据,key 是 ping,value 是 pong

​ 这时候能在两个 follower 上看见 apply 的日志,follower 节点写入了 log,并收到 leader 的通知提交数据。

​ 通过查询接口,也能从 follower 里面查询到刚才写入的数据,证明数据同步没有问题。

​ 有一点需要说明的事,我们这里从 follower 是可能读不到最新数据的。由于 leader 对 set 操作返回的时候,follower 可能还没有 apply 数据,所以从 follower 的 get 查询可能返回旧数据或者空数据。如果要保证能从 follower 查询到的一定是最新的数据还需要很多额外的工作,即做到 linearizable read,有兴趣可以看这篇测试文章,这里不再展开。
4.2 快照保存与恢复
​ 我们再通过 set 接口写入两个数据,能看见节点开始保存快照

​ 在指定的目录下面,能看见快照的具体信息,有两个文件,meta.json 保存了版本号、log 序号、集群节点地址等集群信息;state.bin 里面是快照数据,这里就是我们刚刚写入的数据被 json 序列化后的字符串。

​ 现在把节点都停止,然后重新启动 leader,内存的数据都丢失,它会从保存的快照文件里面恢复数据。重启 follower 也一样会从自己保存的快照里面加载数据。

4.3 leader 切换
​ 把 leader 和 follower 都重启恢复,现在 leader 监听 127.0.01:6000,只有它能执行 set 操作,follower 只能执行 get 操作

​ 我们停掉 leader 节点,两个 follower 会开始选举,这里 node2 赢得了选举,进入 leader 状态,并且它开始打开 set 操作

​ 我们再请求 node2 监听的 127.0.0.1:6001,发现已经可以正常写入数据了,leader 切换顺利完成。

​ 我们再重启原来的 leader 节点,它会变成 follower,并从新的 leader(也就是 node2)这里同步它所缺失的数据。
5. 总结
​ 上面所创建的 stcache 只是一个简单的示例程序,真正要做到在线上使用还有很多问题需要考虑,目前基于 hashicorp/raft 比较成熟的开源软件有 consul,如果有兴趣可以通过它做进一步研究。
​ 总的来说,hashicorp/raft 封装了 raft 的内部协议,提供简洁明了的使用方法,基于它能够很快速地构建出具有强一致性的应用程序。
此文已由腾讯云 + 社区在各渠道发布
获取更多新鲜技术干货,可以关注我们腾讯云技术社区 - 云加社区官方号及知乎机构号

正文完
 0