乐趣区

利用-etcd-进行-leader-选举实现服务高可用

原文地址:https://www.tony-yin.site/2019/05/15/Etcd_Service_HA/#

本文介绍如何通过 etcd 进行 leader 选举,从而实现服务高可用。

概述

Etcd 是什么?

Etcd是一个分布式的,一致的 key-value 存储,主要用于共享配置和服务发现。Etcd是由 CoreOS 开发并维护,通过 Raft 一致性算法处理日志复制以保证强一致性。Raft是一个来自 Stanford 的新的一致性算法,适用于分布式系统的日志复制,Raft通过选举的方式来实现一致性,在 Raft 中,任何一个节点都可能成为 leaderGoogle 的容器集群管理系统 Kubernetes、开源PaaS 平台 Cloud FoundryCoreOSFleet 都广泛使用了etcd

Etcd 的特性?

在分布式系统中,如何管理节点间的状态一直是一个难题,etcd像是专门为集群环境的服务发现和注册而设计,它提供了数据 TTL 失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群节点的状态。Etcd的特性如下:

  • 简单: curl可访问的用户的APIHTTP+JSON
  • 安全: 可选的 SSL 客户端证书认证
  • 快速: 单实例每秒 1000 次写操作
  • 可靠: 使用 Raft 算法保证一致性

为什么需要 Etcd?

所有的分布式系统,都面临的一个问题是多个节点之间的数据共享问题,这个和团队协作的道理是一样的,成员可以分头干活,但总是需要共享一些必须的信息,比如谁是 leader,都有哪些成员,依赖任务之间的顺序协调等。所以分布式系统要么自己实现一个可靠的共享存储来同步信息(比如Elasticsearch),要么依赖一个可靠的共享存储服务,而Etcd 就是这样一个服务。

Etcd主要提供以下能力:

  • 提供存储以及获取数据的接口,它通过协议保证 etcd 集群中的多个节点数据的强一致性。用于存储元信息以及共享配置。
  • 提供监听机制,客户端可以监听某个 key 或者某些 key 的变更(v2v3 的机制不同)。用于监听和推送变更。
  • 提供 key 的过期以及续约机制,客户端通过定时刷新来实现续约(v2v3 的实现机制也不一样)。用于集群监控以及服务注册发现。
  • 提供原子的 CASCompare-and-Swap)和CADCompare-and-Delete)支持(v2 通过接口参数实现,v3通过批量事务实现)。用于分布式锁以及 leader 选举。

第三方库和客户端工具

目前有很多支持 etcd 的库和客户端工具,比如命令行客户端工具 etcdctlGo 客户端 go-etcdJava 客户端 jetcdPython 客户端 python-etcd 等等。

背景

回归正题,一起谈谈如何借助 etcd 进行 leader 选举实现高可用吧。

首先说下背景,现在集群上有一个服务,我希望它是高可用的,当一个节点上的这个服务挂掉之后,另一个节点就会起一个同样的服务,从而保证服务不中断。

这种场景应该比较常见,比如 MySQL 高可用、NFS高可用等等,而这些高可用的实现方式往往是通过 keepalivedctdb 等组件,对外暴露一个虚拟 IP 提供服务。

技术选型

那为什么不采用上面提到的技术实现高可用呢?首先这些技术很成熟,的确很好用,但是不适用于所有场景,它们比较适合对外提供读写服务的场景,而并不是所有服务都是对外服务的;其次,在已经存在 etcd 的集群环境上并且借助 etcd 可以达到高可用的情况下没有必要再引入其他组件;然后,在第三方库和客户端工具上,etcd有很大优势;最后,因为 Raft 算法的关系,在一致性上面 etcd 做的也比上面这几个要好。

核心:TTL & CAS

Etcd进行 leader 选举的实现主要依赖于 etcd 自带的两个核心机制,分别是 TTL 和 Atomic Compare-and-Swap。TTLtime to live)指的是给一个 key 设置一个有效期,到期后这个 key 就会被自动删掉,这在很多分布式锁的实现上都会用到,可以保证锁的实时有效性。Atomic Compare-and-SwapCAS)指的是在对 key 进行赋值的时候,客户端需要提供一些条件,当这些条件满足后,才能赋值成功。这些条件包括:

  • prevExistkey当前赋值前是否存在
  • prevValuekey当前赋值前的值
  • prevIndexkey当前赋值前的Index

这样的话,key的设置是有前提的,需要知道这个 key 当前的具体情况才可以对其设置。

设计原理

所以我们可以这样设计:

  • 先定义一个 key,用作于选举;定义key 对应的 value,每个节点定义的value 需要能够唯一标识;
  • 定义 TTL 周期,各节点客户端运行周期为 TTL/2,这样可以保证key 可以被及时创建或更新;
  • 启动时,每个客户端尝试 cas create key,并设置TTL,如果创建不成功,则表示抢占失败;如果创建成功,则抢占成功,并且给key 赋值了可以唯一标识自己的value,并设置TTL
  • 客户端 TTL/2 定期运行,每个客户端会先 get 这个 keyvalue,跟自己节点定义的 value 相比较,如果不同,则表示自己角色是 slave,所以接下来要做的事就是周期去cas create key,并设置TTL;如果相同,则表示自己角色是master,那么就不需要再去抢占,只要更新这个keyTTL,延长有效时间;
  • 如果 master 节点中途异常退出,那么当 TTL 到期后,其他 slave 节点则会抢占到并选举出新的master

具体实现

环境参数:

etcd:v2
client:python-etcd

定义参数

  • 定义存储目录名称为 etcd_watcheretcd_watcher 所有相关的 key 都在该目录下;
  • 定义用于选举的 key 名称为 master,定义每个节点赋该key 的值为本节点的 hostname 用作唯一标识;
  • 定义 TTL60s,这样 etcd_watcher 定期执行的时间为30s
  • 定义 etcd_watcher 六种角色,分别为:

    • Master:上一次运行角色为Master,当前运行角色仍为Master
    • Slave:上一次运行角色为Slave,当前运行角色仍为Slave
    • ToMaster:上一次运行角色为Slave,当前运行角色为Master
    • ToSlave:上一次运行角色为Master,当前运行角色为Slave
    • InitMaster:上一次运行角色为None,当前运行角色为Master
    • InitSlave:上一次运行角色为None,当前运行角色为Slave
class EtcdClient(object):                                                          
    def __init__(self):                                                            
        self.hostname = get_hostname()                                             
        self.connect()                                                             
        self.ttl = 60                                                              
        self.store_dir = '/etcd_watcher'                                           
        self.master_file = '{}/{}'.format(self.store_dir, 'master')                
        self.master_value = self.hostname                                          
        # node role                                                              
        self.Master = 'Master'                                                     
        self.Slave = 'Slave'                                                       
        self.ToMaster = 'ToMaster'                                                 
        self.ToSlave = 'ToSlave'                                                   
        self.InitMaster = 'InitMaster'                                             
        self.InitSlave = 'InitSlave'                                               
        # node basic status: master or slave                                            
        self.last_basic_status = None                                              
        self.current_basic_status = None                                           

连接 Etcd

Etcd支持 httpsssl认证,所以 etcd 集群做了这些安全配置的话,需要在实例化 Client 的时候配置 protocolcertca_cert 选项。

这里不得不吐槽一下 python-etcd 的官方文档,连这些配置都未说明,还是笔者去翻源码才找到的。。。

    def connect(self):                                                             
        try:                                                                       
            self.client = etcd.Client(                                             
                host='localhost',                                                  
                port=2379,                                                         
                allow_reconnect=True,                                              
                protocol='https',                                                  
                cert=('/etc/ssl/etcd/ssl/node-{}.pem'.format(self.hostname),         
                    '/etc/ssl/etcd/ssl/node-{}-key.pem'.format(self.hostname)   
                ),                                                                 
                ca_cert='/etc/ssl/etcd/ssl/ca.pem'                                 
            )                                                                      
        except Exception as e:                                                     
            logger.error("Connect etcd failed: {}".format(str(e))) 

获取节点基本状态

CAS采用两个条件:prevValueprevExist,下面是三个最基础的函数:

  • 创建master-key:争抢master
  • 获取 master-key:获取master 的值
  • 更新 master-key:更新masterTTL
    def create_master(self):                                                    
        logger.info('Create master.')                                           
        try:                                                                    
            self.client.write(                                                  
                self.master_file,                                               
                self.master_value,                                              
                ttl=self.ttl,                                                   
                prevExist=False                                                 
            )                                                                   
        except Exception as e:                                                  
            logger.error("Create master failed: {}".format(str(e)))             
                                                                                
    def get_master(self):                                                       
        try:                                                                    
            master_value = self.get(self.master_file)                           
            return master_value                                                 
        except etcd.EtcdKeyNotFound:                                            
            logger.error("Key {} not found.".format(self.master_file))          
        except Exception as e:                                                  
            logger.error("Get master value failed: {}".format(str(e)))          
                                                                                
    def update_master(self):                                                    
        try:                                                                    
            self.client.write(                                                  
                self.master_file,                                               
                self.master_value,                                              
                ttl=self.ttl,                                                   
                prevValue=self.master_value,                                    
                prevExist=True                                                  
            )                                                                   
        except Exception as e:                                                  
            logger.error("Update master failed: {}".format(str(e)))      

获取当前节点基本状态是 Master 还是Slave

    def get_node_basic_status(self):                                            
        node_basic_status = None                                                
        try:                                                                    
            master_value = self.get_master()                                    
            if master_value == self.master_value:                               
                node_basic_status = self.Master                                 
            else:                                                               
                node_basic_status = self.Slave                                  
        except Exception as e:                                                  
            logger.error("Get node basic status failed: {}".format(str(e)))     
        return node_basic_status                              

获取节点角色

获取当前节点基本状态,跟上一次的基本状态作比较,得到最终的角色,这里的角色分为上述的六种。

    def get_node_status(self):                                                     
        self.last_basic_status = self.current_basic_status                         
        self.current_basic_status = self.etcd_client.get_node_basic_status()       
        node_status = None                                                         
        if self.current_basic_status == self.Master:                               
            if self.last_basic_status is None:                                     
                node_status = self.InitMaster                                      
            elif self.last_basic_status == self.Master:                            
                node_status = self.Master                                          
            elif self.last_basic_status == self.Slave:                             
                node_status = self.ToMaster                                        
            else:                                                                  
                logger.error("Invalid last basic status for master: {}".format(self.last_basic_status)                                        
                )                                                                  
        elif self.current_basic_status == self.Slave:                              
            if self.last_basic_status is None:                                     
                node_status = self.InitSlave                                       
            elif self.last_basic_status == self.Master:                            
                node_status = self.ToSlave                                         
            elif self.last_basic_status == self.Slave:                             
                node_status = self.Slave                                           
            else:                                                                  
                logger.error("Invalid last basic status for slave: {}".format(self.last_basic_status)                                        
                )                                                                  
        else:                                                                      
            logger.error("Invalid current basic status: {}".format(self.current_basic_status)                                         
            )                                                                      
                                                                                   
        return node_status         

服务高可用

根据当前节点的角色,协调服务做对应的工作,保证高可用。

    def run(self):                                                              
        try:                                                                    
            logger.info("===== Init Etcd Wathcer =====")                        
            self.etcd_client.create_master()                                    
            while True:                                                         
                node_status = self.get_node_status()                            
                logger.info("node status : {}".format(node_status))             
                if node_status == self.etcd_client.ToMaster:                    
                    self.do_ToMaster_work()                                     
                    self.etcd_client.update_master()                            
                elif node_status == self.etcd_client.InitMaster:                
                    self.do_InitMaster_work()                                   
                    self.etcd_client.update_master()                            
                elif node_status == self.etcd_client.Master:                    
                    self.etcd_client.update_master()                            
                elif node_status == self.etcd_client.ToSlave:                   
                    self.do_ToSlave_work()                                      
                    self.etcd_client.create_master()                            
                elif node_status == self.etcd_client.InitSlave:                 
                    self.do_InitSlave_work()                                    
                    self.etcd_client.create_master()                            
                elif node_status == self.etcd_client.Slave:                     
                    self.etcd_client.create_master()                            
                else:                                                           
                    logger.error("Invalid node status: {}".format(node_status)) 
                time.sleep(self.interval)                                       
                self.etcd_client = EtcdClient()                                 
        except Exception:                                                       
            logger.error("Etcd watcher run error:{}".format(traceback.format_exc()))

Refer

  1. Etcd V2 API
  2. Python-etcd doc
  3. etcd 使用经验总结
  4. Etcd 架构与实现解析
  5. ETCD 实现 leader 选举
  6. 主从系统的实现
  7. 利用 ETCD 进行多 Mater 模块容灾
  8. python 使用 etcd 来实现配置共享及集群服务发现【上】

总结

本文通过 etcd 自带的特性进行选举,然后通过选举机制实现了服务的高可用。只要 etcd 集群正常运行,服务就可以达到主备容灾的效果。该 watcher 可以应用于任意服务,并且可以一对多,多个服务可以共用一套选举机制,避免使用多套高可用组件。大家对该项目有兴趣的话,可以去 Github 上详细阅读,喜欢的话请点个赞哦(#^.^#)。

项目地址:https://github.com/tony-yin/e…

退出移动版