Zookeeper-扩展之殇

一、背景基于公司发展硬性需求,生产VM服务器要统一迁移到ZStack 虚拟化服务器。检查自己项目使用的服务器,其中zookeeper集群中招,所以需要进行迁移。 二、迁移计划为了使迁移不对业务产生影响,所以最好是采用扩容 -> 缩容 的方式进行。 说明:1.原生产集群为VM-1,VM-2,VM-3组成一个3节点的ZK集群;2.对该集群扩容,增加至6节点(新增ZS-1,ZS-2,ZS-3),进行数据同步完成;3.进行缩容,下掉原先来的三个节点(VM-1,VM-2,VM-3);4.替换nginx解析地址。OK! 目标很明确,过程也很清晰,然后开干。三、步骤 (过程已在测试环境验证无问题):对新增的三台服务器进行zk环境配置,和老集群配置一样即可,最好使用同一版本(版主使用的是3.4.6);对老节点的zoo.cfg 增加新集群的地址(逐一增加),然后对新增加节点逐一重启。 四、问题ZS-1 启动成功,zkServer.sh status 报错,用zkServer.sh status查看,反馈如下异常:[root@localhost bin]# ./zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfgError contacting service. It is probably not running.此时查看数据,数据同步正常ZS-1 数据同步正常,但是无法查看节点的状态信息;怀疑是因为老节点没有重启的原因;此时去查看原集群节点信息,发现原集群节点状态异常。经排查定位,原集群的状态一直处于异常状态。初步定位原因可能是原集群的选举存在异常,导致新节点无法正常纳入,继续排查。恢复集群初始状态,如果集群节点的状态一直没法正常查看。OK 继续定位...五、排查过程以下方法来自于网络: 可能有以下几个原因:第一、zoo.cfg文件配置:dataLogDir指定的目录未被创建。 1.zoo.cfg[root@SIA-215 conf]# cat zoo.cfg...dataDir=/app/zookeeperdata/datadataLogDir=/app/zookeeperdata/log...2.路径[root@SIA-215 conf]# cd /app/zookeeperdata/[root@SIA-215 zookeeperdata]# lltotal 8drwxr-xr-x 3 root root 4096 Apr 23 19:59 datadrwxr-xr-x 3 root root 4096 Aug 29 2015 log经排查 排除该因素。 第二、myid文件中的整数格式不对,或者与zoo.cfg中的server整数不对应。 [root@SIA-215 data]# cd /app/zookeeperdata/data[root@SIA-215 data]# cat myid 2[root@SIA-215 data]# 定位排查后排除不是该原因。 ...

May 29, 2019 · 2 min · jiezi

zookeeper第4篇使用客户端API来操作ZK

Java 客户端pom.xml 文件中引入相关api <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.4-beta</version></dependency>创建连接/** * 创建 zookeeper 会话 * <p> * <p> * zookeeper 客户端 和 服务端创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立。 * 当真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。 */public class ZooKeeperClientSession implements Watcher { //用于等待zk服务端通知 private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2183", 5000, new ZooKeeperClientSession()); System.out.println(zooKeeper.getState()); latch.await(); long sessionId = zooKeeper.getSessionId(); byte[] sessionPasswd = zooKeeper.getSessionPasswd(); System.out.println(zooKeeper.getSessionId()); /** * 利用 sessionId 和 sessionPasswd 复用会话连接 */ ZooKeeper zooKeeper1 = new ZooKeeper("127.0.0.1:2183", 5000, new ZooKeeperClientSession(), sessionId, sessionPasswd); System.out.println(zooKeeper1.getSessionId()); } /** * 处理 zookeeper 服务端的 Watcher 通知 * @param watchedEvent */ public void process(WatchedEvent watchedEvent) { System.out.println("receive watch event : " + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { latch.countDown(); } }}1.构造函数说明ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)参数作用connectStringzk服务器列表,由英文逗号分开的字符串,例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183;也可以是带有目录的字符:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/zk-booksessionTimeout会话超时时间,以毫秒为单位。在一个会话周期内,zk客户端和服务端通过心跳来检查连接的有效性,一旦在sessionTimeout时间内没有进行心跳检测,则会话失效watcherzk允许客户端在构造方法中传入一个Watcher接口实现类作为事件通知处理器sessionId、sessionPasswd利用sessionId 和 sessionPasswd 确保复用会话连接canBeReadOnly用于标识当前会话是否支付只读模式。在zk集群模式中,如果一台集群和集群中过半以上的机器都都失去了网络连接,那么这个机器将不再处理客户端请求,包括读写请求。但在某些情况下出现类似问题,我们希望该台机器能够处理读请求,此时为 read-only 模式创建节点

May 22, 2019 · 1 min · jiezi

zookeeper第3篇zookeeper-安装

单机版安装进入zooKeeper官网:http://zookeeper.apache.org/,找到你想下载的版本,我这里下载的是 zookeeper 3.5.4 版本。 把 zookeeper-3.5.4.tar.gz 放到指定目录/Users/shifeifei/Software/zk-3.5.4这是我的目录。 执行解压命令: tar -zxvf zookeeper-3.5.4.tar.gz得到目录 , zookeeper-3.5.4-beta,可观察到zookeeper的目录结构及内容: zookeeper 环境变量配置vim ~/.bash_profileexport ZK_HOME=/Users/shifeifei/Software/zk-3.5.4/zookeeper-3.5.4-betaexport PATH=$ZK_HOME/bin:$PATHsource ~/.bash_profilezookeeper 相关配置配置 zoo.cfg 文件进入到目录 /Users/shifeifei/Software/zk-3.5.4/zookeeper-3.5.4-beta/conf修改文件 zoo_sample.cfg 文件内容,并重新命名为 zoo.cfg tickTime=2000initLimit=10syncLimit=5dataDir=/Users/shifeifei/Software/zk-3.5.4/zk-data/dataLogDir=/Users/shifeifei/Software/zk-3.5.4/zk-log/clientPort=2181#maxClientCnxns=60#autopurge.snapRetainCount=3#autopurge.purgeInterval=1server.1=127.0.0.1:2888:3888其中 dataDir 和 dataLogDir 目录是你自定义指定的。 创建 myid 文件,该文件在 dataDir 所表示的目录下 文件内容是:server.1=127.0.0.1:2888:3888 中的 server.1 的 1 启动服务进入目录 /Users/shifeifei/Software/zk-3.5.4/zookeeper-3.5.4-beta/bin执行命令:./zkServer.sh start

May 22, 2019 · 1 min · jiezi

zookeeper选举源码分析

在 zookeeper 集群中发生选举的场景有以下三种: 集群启动时Leader 节点重启时Follower 节点重启时本文主要针对集群启动时发生的选举实现进行分析。 ZK 集群中节点在启动时会调用QuorumPeer.start方法public synchronized void start() { /** * 加载数据文件,获取 lastProcessedZxid, currentEpoch,acceptedEpoch */ loadDataBase(); /** * 启动主线程 用于处理客户端连接请求 */ cnxnFactory.start(); /** * 开始 leader 选举; 会相继创建选举算法的实现,创建当前节点与集群中其他节点选举通信的网络IO,并启动相应工作线程 */ startLeaderElection(); /** * 启动 QuorumPeer 线程,监听当前节点服务状态 */ super.start();}加载数据文件在 loadDataBase 方法中,ZK 会通过加载数据文件获取 lastProcessedZxid , 并通过读取 currentEpoch , acceptedEpoch 文件来获取相对应的值;若上述两文件不存在,则以 lastProcessedZxid 的高 32 位作为 currentEpoch , acceptedEpoch 值并写入对应文件中。 初始选举环境synchronized public void startLeaderElection() { try { // 创建投票 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { } // 从集群中节点列表,查找当前节点与其他进行信息同步的地址 for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } // electionType == 3 this.electionAlg = createElectionAlgorithm(electionType);}protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { // 忽略其他算法的实现 case 3: /** * 创建 QuorumCnxManager 实例,并启动 QuorumCnxManager.Listener 线程用于与集群中其他节点进行选举通信; */ qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); /** * 创建选举算法 FastLeaderElection 实例 */ le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le;}初始节点的相关实例之后,执行 super.start() 方法,因 QuorumPeer 类继承 ZooKeeperThread 故会启动 QuorumPeer 线程 ...

May 16, 2019 · 11 min · jiezi

ZooKeeper安装效果演示

高可用(HA) -- ZooKeeperZooKeeper一个开源的分布式的,为分布式应用提供服务的项目提供原语集合以便分布式应用可以在它之上构建更高层次的同步服务角色 观察者模式: leader: 领导者负责进行投票的发起及决议, 更新状态 学习者: follower: 接受客户端请求并发挥客户端返回结果,参与投票 observer: 接受请求,转发给leader,不参与投票,只同步leader. 客户端: 发起请求 观察者模式的应用: 软件皮肤, 编辑工具设置安装 伪分布模式 1)安装ZooKeeper (再次注意权限)$ tar -zxf /opt/software/zookeeper-3.4.5.tar.gz -C /opt/modules/2)新建一个Zookeeper的data目录$ mkdir zkData --//可以不用手动创建,启动自动生成3)修改配置文件${ZOOKEEPER_HOME}/conf (注意: 配置文件为模板,需拷贝重名为zoo.cfg)$cd /opt/modules/zookeeper-3.4.5/ ##切换目录到zookeeper-3.4.5$ cp conf/zoo_sample.cfg conf/zoo.cfg ##拷贝$ vi conf/zoo.cfg ##修改12行,设置以下:dataDir=/opt/modules/zookeeper-3.4.5/zkData4)启动zookeeper$bin/zkServer.sh start$ jps #查看java进程如下2088 QuorumPeerMain5)查看zookeeper的状态$bin/zkServer.sh status #信息如下JMX enabled by defaultUsing config: /opt/modules/zookeeper-3.4.5/bin/../conf/zoo.cfgMode: standalone #单机模式6)一些命令的认识$ bin/zkCli.sh #进入zookper help #查看命令 quit #退出 create #创建 -e临时znode -s 自动编号 get path #查看信息 ls path #查看指定目录的列表 rmr path #删除ls / #查看根目录create -e /myapp msg #创建目录 get /myapp #查看myapp创建信息 ls / watch # 添加关注事件rmr /myapp #删除触发关注事件 quit 完全分布模式 ...

May 12, 2019 · 4 min · jiezi

Zookeeper学习系列三Zookeeper-集群架构读写机制以及一致性原理ZAB协议

前言同学们,在上一章中,我们主要讲了Zookeeper两种启动模式以及具体如何搭建。本章内容主要讲的是集群相关的原理内容,第一章可以当做是Zookeeper原理篇的基础部分,本章则是Zookeeper原理篇进阶部分,有关于Zookeeper集群的读写机制、ZAB协议的知识解析。 本篇的内容主要包含以下几点: Zookeeper 集群架构Zookeeper 读写机制 ZAB协议关于Zookeeper 集群的一些其他讨论 Zookeeper(读性能)可伸缩性 和 Observer节点Zookeeper 与 CAP 理论Zookeeper 作为 服务注册中心的局限性一、Zookeeper 集群架构接下来我们来说一说Zookeeper的集群架构。 Zookeeper 集群中的角色第一章提过,Zookeeper中,能改变ZooKeeper服务器状态的操作称为事务操作。一般包括数据节点创建与删除、数据内容更新和客户端会话创建与失效等操作。Leader 领导者 :Leader 节点负责Zookeeper集群内部投票的发起和决议(一次事务操作),更新系统的状态;同时它也能接收并且响应Client端发送的请求。Learner 学习者 Follower 跟随者 : Follower 节点用于接收并且响应Client端的请求,如果是事务操作,会将请求转发给Leader节点,发起投票,参与集群的内部投票,Observer 观察者:Observer 节点功能和Follower相同,只是Observer 节点不参与投票过程,只会同步Leader节点的状态。Client 客户端Zookeeper 通过复制来实现高可用。在上一章提到的集群模式(replicated mode)下,以Leader节点为准,Zookeeper的ZNode树上面的每一个修改都会被同步(复制)到其他的Server 节点上面。 上面实际上只是一个概念性的简单叙述,在看完下文的读写机制和ZAB协议的两种模式之后,你就会对这几种角色有一个更加深刻的认识。二、Zookeeper 读写机制读写流程下图就是集群模式下一个Zookeeper Server节点提供读写服务的一个流程。 如上图所示,每个Zookeeper Server节点除了包含一个请求处理器来处理请求以外,都会有一个内存数据库(ReplicatedDatabase) 用于持久化数据。ReplicatedDatabase 包含了整个Data Tree。 来自于Client的读服务(Read Requst),是直接由对应Server的本地副本来进行服务的。 至于来自于Client的写服务(Write Requst),因为Zookeeper要保证每台Server的本地副本是一致的(单一系统映像),需要通过一致性协议(后文提到的ZAB协议)来处理,成功处理的写请求(数据更新)会先序列化到每个Server节点的本地磁盘(为了再次启动的数据恢复)再保存到内存数据库中。 集群模式下,Zookeeper使用简单的同步策略,通过以下三条基本保证来实现数据的一致性: 全局串行化所有的写操作 串行化可以把变量包括对象,转化成连续bytes数据. 你可以将串行化后的变量存在一个文件里或在网络上传输. 然后再反串行化还原为原来的数据。保证同一客户端的指令被FIFO执行(以及消息通知的FIFO) FIFO -先入先出自定义的原子性消息协议 简单来说,对数据的写请求,都会被转发到Leader节点来处理,Leader节点会对这次的更新发起投票,并且发送提议消息给集群中的其他节点,当半数以上的Follower节点将本次修改持久化之后,Leader 节点会认为这次写请求处理成功了,提交本次的事务。 乐观锁Zookeeper 的核心思想就是,提供一个非锁机制的Wait Free 的用于分布式系统同步的核心服务。其核心对于文件、数据的读写服务,并不提供加锁互斥的服务。 但是由于Zookeeper的每次更新操作都会更新ZNode的版本(详见第一章),也就是客户端可以自己基于版本的对比,来实现更新数据时的加锁逻辑。例如下图。 就像我们更新数据库时,会新增一个version字段,通过更新前后的版本对比来实现乐观锁。 三、ZAB协议终于到了ZAB协议,讲述完ZAB协议,大家对Zookeeper的一些特性会有更深的体会,对本文的其他内容也会有更透彻的理解。 ...

May 12, 2019 · 1 min · jiezi

Zookeeper学习系列二Zookeeper-集群章节之集群搭建

前言同道们,好久不见,上一章中,我主要讲了Zookeeper的一些基础的知识点。数据模型 + 原语集 + Watches机制。本章内容主要讲的是集群搭建相关的知识。 本篇的内容主要包含以下几点: Zookeeper 运行模式Zookeeper 搭建一、Zookeeper 运行模式Zookeeper 有两种运行模式,单点模式和集群模式。 单点模式(standalone mode)- Zookeeper 只运行在单个服务器上,常用于开发测试阶段,这种模式比较简单,但是不能保证Zookeeper服务的<font color= 'red'>高可用性</font>和<font color= 'red'>恢复性</font>。集群模式(replicated mode)- 英文原文这种模式叫做“复制模式”;这个模式下,Zookeeper运行于一个集群上,适合生产环境。 同一个集群下的server节点被称为quorum,翻译过来就是“一个正式会议的法定人数”,如果你看完下一章介绍的ZAB协议的两种模式之后,应该会觉得这个比喻实际上很形象。 NOTE: 在集群模式下,最少需要三个server节点。并且官方推荐你使用奇数数量的server节点来组成集群。至于为什么,和Zookeeper的读写策略和一致性协议有关,在后面的章节会介绍。二、Zookeeper 搭建单点模式使用过zookeeper的同学们应该知道,启一个zookeeper server 非常简单,如果是单点模式,只需要以下步骤: 去官网下载对应源码压缩包,然后上传到服务器解压,tar -zxvf zookeeper-***.tar.gz进入到源码目录下的conf目录,根据zoo_sample.cfg,创建一个配置文件zoo.cfg,启动时默认就是按照zoo.cfg这个配置文件的信息来启动 # Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,# 也就是每个 tickTime 时间就会发送一个心跳。tickTime=2000 # Zookeeper 保存数据的目录dataDir=/data/zk/data # Zookeeper 保存日志文件的目录dataLogDir=/data/zk/log# 客户端连接Zookeeper 服务器的端口# Zookeeper 会监听这个端口,接受客户端的访问请求clientPort=2181进入到源码目录下的bin目录,执行zkServer.sh脚本文件即可 # 启动zkServer.sh start# 关闭zkServer.sh stop查看当前zookeeper 状态 [root@localhost bin]# sh zkServer.sh status ZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper-3.4.8/bin/../conf/zoo.cfgMode: standaloneMode:standalone可以看到现在的节点启动类型。 集群搭建集群模式的搭建和单点模式的差别不大,如果按照官方的最低要求,就是三台服务器,在这三台服务器上面分别执行一下上述单点模式的步骤,同一集群下的每台服务器的配置文件类似。 ...

May 12, 2019 · 2 min · jiezi

Zookeeper

什么是Zookeeper?百科上面怎么说的? Zookeeper 是由 Apache 的一个顶级项目, 主要用来做分布式集群服务的注册与发现。意思是新建一个服务了,把它"写入"Zookeeper。然后服务有什么变动或者故障啦,它也能通过某种机制及时“发现”。从而告诉其他小伙伴,这个服务不可用啦,不要去调它啦,Zookeeper就是专门干这样的事。 Zookeeper听说有个session机制,如何理解?启动一个客户端,这个客户端初始化的时候就会connecting,连上了客户端就会处于connected的状态。这样就建立了一个session,然后通过心跳机制(客户端向服务端定时发送ping请求)保持session。如果检查心跳结束,即没人ping啦,那肯定知道是这个服务死了,就会设置session为过期,把这个节点的所有数据清掉。Zookeeper服务端就是通过这种方式来知道客户端是否还活着的。 只有leader才有权写入,follower只能读取,然后通过某种方式同步一下数据。 Zookeeper集群什么时候才要选举?以及选举决议的办法?

May 6, 2019 · 1 min · jiezi

共识问题

共识:一致同意,完整(只决定一次),有效,终止(宕机不回来)。要多数都同意,很慢。paxos完全符合,单raft,zap考虑的是宕机还会回来的情况,用日志保证。能解决诸如以下问题: 全序广播相当于重复多伦共识:但raft和zap等直接实现全序广播内有一次一值的共识。单领导者选取:1选出一位领导者,2对领导者的提议进行表决(防止1,一个节点相信自己是领导)投票是同步的,动态成员扩展难,依靠超时检测节点失效,若只有一条特定网络不可靠,会进入领导频繁二人转局面共识算法raft数据一致性是通过日志复制的方式,client发给leader(写只发给leader,follower备份恢复用),leader写入日志,同步给follower,当多数follower写入日志并返回给leader时,leader提交数据,返回给客户端确认消息, 发给follower数据已提交,follower提交数据,发回确认给leader。所有的发送都随着调频发过去。raft中所有server之间的通信都是RPC调用,并且只有两种类型的RPC调用:第一种是RequestVote,用于选举leader;第二种是AppendEntries。日志和投票结果都需要持续化写在磁盘中,保证宕机后重启任然正常。 leader(有任期字段term),candidate, follower.每个节点有在T到2T之间随机选择超时时间。leader和follower通过跳频联系。当一个follower收不到leader的跳频超时时将发起投自己的票。任何一个follower只能投一票。当一轮投票结束有多个候选者时,这几个候选者重新分配随机的超时时间。 当确认提交后,leader会一直不断地重试提交的rpc给follower、重试,直到请求成功;即使follower宕机了,重启后leader仍会接着发请求,直到请求成功,当leader宕机,如何向follower继续发;1.leader的日志只能增加,=》所以在选择时选term大,log长的 2.leader会把自己的log复制到其他机器,如果新达到多数并且此任期已有数据过半(挂前的一次数据不会被重复提交)就提交,只提交新任期的,同步还是要同步。为了恢复log一致性,leader为集群中所有follower都保存一个状态变量,即nextIndex:1)nextIndex是leader准备向某个follower发送的下一个log entry的index;2)当leader刚刚即位后,nextIndex的初始值是(1+leader's last index);当leader看到请求被拒绝时,其动作非常简单:只需将nextIndex-1,再次尝试。 term需要存盘任意一个server在一个term内只能投出一票;一旦已经投给了一个candidate,它必须拒绝其他candidate的投票请求;其实server根本不在意把票投给谁,它只会把票投给最先到请求到它的candidate;为了保证这一点,必须把投票信息持久保存到磁盘上,这样可以保证即使该server投完票后宕机,稍后又立即重启了,也不会在同一个term内给第二个candidate投票了。每个日志entry:iterm+index.每次发送AppendEntries时需要带上一次的,检查是否一样,一样才接受来保证所有机器log一致, paxosbasic paxos 这里有个错误。第二阶段若N>=ResN,接受提案,若N<ResN不接受。实际上这里的proposal是leader。共识算法正常是proposor,leader,accepter,leaner(先忽略),用来决议proposer的提议号和是否成功的。每次proposal先到leader(可随机选取,不重要),leader发给accepter若没有冲突返回any否则返回已选的,继续上述过程。 问题:多个Proposal可能出现死锁一直循环递增N的情况: 上面这个是https://www.microsoft.com/en-... 为了方便理解,去除了实现细节。实时上再应用中,客户端不会自己处理冲突+1再次投票和发送给其他leaner,这些应该由另一个角色,在basic中,由一群c协调者,可以和acceptor一样,或者是其中的部分构成,每轮随机一个c作为leader,负责收集本轮结果和通知leaner。proposal->leader(每个client随机发就可以作为本轮leader)->pre->acceptors返回最大N的值V->带N请求->acceptors->leader->返回给proposal->client失败或者成功或再次投票->投票成功后发给leaner。此过程中CLIENT2再次发送是另一个leader。 fast paxos 若proposal和acceptor,leader,leaner都是分布式,且要持久化,持久化+发送来回的代价就多了,若leader发现没有冲突,不再参与,proposal直接提交给acceptor(同一轮只投给先到的),直接发送给leaner,可以理解为基于乐观锁的思想,leaner和CLIENT都自行决议, 若proposal没有决策成功(先到的就是投票,没有半数以上的),1.重新引入leader,异步发送给协调者,协调者选择(因为acceptor只投一次),发给proposal结果。(再次引入leader)2.无leader,在acceptor决议后发送给所有acceptor,其他acceptor收到此消息后对i+1轮的可以比较投票(即使同时刻一个一半也可以再比较投一次)。https://www.microsoft.com/en-...muti-paxos 当leader稳定,可以省去prepare阶段 具体做法如下: ① 当某个副本节点通过选举成为Master后,就会使用新分配的编号N来广播一个Prepare消息,该Prepare消息会被所有未达成一致的Instance和目前还未开始的Instance共用。 ② 当Acceptor接收到Prepare消息后,必须对多个Instance同时做出回应,这通常可以通过将反馈信息封装在一个数据包中来实现,假设最多允许K个Instance同时进行提议值的选定,那么: -当前之多存在K个未达成一致的Instance,将这些未决的Instance各自最后接受的提议值封装进一个数据包,并作为Promise消息返回。 -同时,判断N是否大于当前Acceptor的highestPromisedNum值(当前已经接受的最大的提议编号值),如果大于,那么就标记这些未决Instance和所有未来的Instance的highestPromisedNum的值为N,这样,这些未决Instance和所有未来Instance都不能再接受任何编号小于N的提议。 ③ Master对所有未决Instance和所有未来Instance分别执行Propose->Accept阶段的处理,如果Master能够一直稳定运行的话,那么在接下来的算法运行过程中,就不再需要进行Prepare->Promise处理了。但是,一旦Master发现Acceptor返回了一个Reject消息,说明集群中存在另一个Master并且试图使用更大的提议编号发送了Prepare消息,此时,当前Master就需要重新分配新的提议编号并再次进行Prepare->Promise阶段的处理。 可见chubby就是一个典型的Muti-Paxos算法应用,在Master稳定运行的情况下,只需要使用同一个编号来依次执行每一个Instance的Promise->Accept阶段处理。 raft和paxos区别raft要有一个leader。在选主时每个follower只能投一次,不成功随机时间下一次。有主时的共识由主来给日志编号,比较就好。follower保证稳定可替换即可。paxos leader不能那么重要(fast paxos在无冲突时甚至无leader参与),每次可以随机选,只是汇总投票,prososol是否通过由多数决定,prososol回复客户端和同步其他leaner。算是无主的模型。zap还是有leader的。zap在无主的时候选举算法和fast paxos很像,有最大xid(类似pre阶段,只不过是上次存好的),每次投票直接给acceptor并且无协调者的冲突处理。在有主时,用paxos的思想先pre收集并同步信息保证一致,主处理写,多数处理成功后回复。 优势就是单主能不能抗住了。 zookeeperZookeeper对于每个节点QuorumPeer的设计相当的灵活,QuorumPeer主要包括四个组件:客户端请求接收器(ServerCnxnFactory)、数据引擎(ZKDatabase)、选举器(Election)、核心功能组件(Leader/Follower/Observer不同) 采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。 本身的数据组织以文件形式。 作用1.单独zk集群元数据的可靠性和一致性保证,元数据保存在zk所有副本中(少量完全可以放在内存中数据)路由,选择数据库,调度程序2.单独zk集群,锁,防护令牌,获取锁或者zxid 3.变更通知,每个变更都会发送到所有节点watch机制4.用于检测,服务发现session:每个ZooKeeper客户端的配置中都包括集合体中服务器的列表。在启动时,客户端会尝试连接到列表中的一台服务器。如果连接失败,它会尝试连接另一台服务器,以此类推,直到成功与一台服务器建立连接或因为所有ZooKeeper服务器都不可用而失败。只要一个会话空闲超过一定时间,都可以通过客户端发送ping请求(也称为心跳)保持会话不过期。ping请求由ZooKeeper的客户端库自动发送,因此在我们的代码中不需要考虑如何维护会话。这个时间长度的设置应当足够低,以便能档检测出服务器故障(由读超时体现),并且能够在会话超时的时间段内重新莲接到另外一台服务器。 zookeeper数据同步过程:zab protocol Leader election leader选举过程,electionEpoch自增,在选举的时候lastProcessedZxid越大,越有可能成为leaderDiscovery: 第一:leader收集follower的lastProcessedZxid,这个主要用来通过和leader的lastProcessedZxid对比来确认follower需要同步的数据范围 第二:选举出一个新的peerEpoch,主要用于防止旧的leader来进行提交操作(旧leader向follower发送命令的时候,follower发现zxid所在的peerEpoch比现在的小,则直接拒绝,防止出现不一致性)Synchronization: follower中的事务日志和leader保持一致的过程,就是依据follower和leader之间的lastProcessedZxid进行,follower多的话则删除掉多余部分,follower少的话则补充,一旦对应不上则follower删除掉对不上的zxid及其之后的部分然后再从leader同步该部分之后的数据Broadcast 正常处理客户端请求的过程。leader针对客户端的事务请求,然后提出一个议案,发给所有的follower,一旦过半的follower回复OK的话,leader就可以将该议案进行提交了,向所有follower发送提交该议案的请求,leader同时返回OK响应给客户端实际上zookeeper中算法三阶段:FSE=>Recovery=>Broadcast(广播和上面的一致) fast leader election基于fast paxos。发送给所有的节点。没有随机leader参与收集。 LOOKING:进入leader选举状态FOLLOWING:leader选举结束,进入follower状态LEADING:leader选举结束,进入leader状态OBSERVING:处于观察者状态1.serverA首先将electionEpoch自增,然后为自己投票2 serverB接收到上述通知,然后进行投票PK如果serverB收到的通知中的electionEpoch比自己的大,则serverB更新自己的electionEpoch为serverA的electionEpoch如果该serverB收到的通知中的electionEpoch比自己的小,则serverB向serverA发送一个通知,将serverB自己的投票以及electionEpoch发送给serverA,serverA收到后就会更新自己的electionEpoch在electionEpoch达成一致后,就开始进行投票之间的pk,优先比较proposedEpoch,然后优先比较proposedZxid,最后优先比较proposedLeaderpk完毕后,如果本机器投票被pk掉,则更新投票信息为对方投票信息,同时重新发送该投票信息给所有的server。如果本机器投票没有被pk掉,如果是looking,过半更改状态,如果FOLLOWING/LEADING说明落后,加速收敛Recovery略:https://my.oschina.net/pingpa...follower读写过程图: ectd

April 26, 2019 · 1 min · jiezi

Zookeeper安装

在本地window 64环境下进行的安装,zk官网,下载的是3.4.14。解压并切换到conf目录,修改zoo_sample.cfg修改为zoo.cfg,zoo.cfg中相关参数说明: # zk里使用的基本时间单位#tickTime=2000# leader和follower之间最长的心跳时间,心跳时间为initLimit*tickTime,#如果为10,则这里的心跳时间为10 * 2000=20000ms=20s#initLimit=10# leader和follower之间发送消息,请求和应答的最大时间长度,时间长度为syncLimit*tickTime,#如果为5,则这里的时间长度为5*2000=10000ms=10s#syncLimit=5# zk数据目录#dataDir=/zookeeper_data# 端口#clientPort=2181# 客户端连接上限,可增加#maxClientCnxns=60进入D:zookeeper-3.4.14bin,双击zkServer.cmd 2019-04-23 17:44:08,580 [myid:] - INFO [main:Environment@100] - Server environment:java.compiler=<NA>2019-04-23 17:44:08,581 [myid:] - INFO [main:Environment@100] - Server environment:os.name=Windows 72019-04-23 17:44:08,582 [myid:] - INFO [main:Environment@100] - Server environment:os.arch=amd642019-04-23 17:44:08,582 [myid:] - INFO [main:Environment@100] - Server environment:os.version=6.12019-04-23 17:44:08,587 [myid:] - INFO [main:Environment@100] - Server environment:user.name=weilu2019-04-23 17:44:08,588 [myid:] - INFO [main:Environment@100] - Server environment:user.home=C:\Users\weilu2019-04-23 17:44:08,589 [myid:] - INFO [main:Environment@100] - Server environment:user.dir=D:\zookeeper-3.4.14\bin2019-04-23 17:44:08,596 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 20002019-04-23 17:44:08,596 [myid:] - INFO [main:ZooKeeperServer@845] - minSessionTimeout set to -12019-04-23 17:44:08,597 [myid:] - INFO [main:ZooKeeperServer@854] - maxSessionTimeout set to -12019-04-23 17:44:08,690 [myid:] - INFO [main:ServerCnxnFactory@117] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory2019-04-23 17:44:08,693 [myid:] - INFO [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181开一个command,进入到安装zk的bin目录下,执行:zkCli.cmd -server localhost:2181部分输出: ...

April 23, 2019 · 2 min · jiezi

zookeeper的安装

zookeeper的安装环境 CentorOS7(以下简称linux)安装包 zookeeper-3.5.4-beta.tar.gz官网:https://archive.apache.org/dist/zookeeper/步骤 通过文件上传工具(filezilla)将文件上传到linux环境上;(我这里解压包在downInfo目录下)   解压 解压zk 命令 tar -zxvf zookeeper-3.5.4-beta.tar.gz        解压成功 修改配置文件 命令:   cd /zookeeper-3.4.5/confcp zoo_sample.cfg zoo.cfg 创建zkdata文件夹,存放数据 命令: mkdir zkdata 编辑zoo.conf文件,修改其dataDir路径,如下图所示 命令: vim  zoo.conf 运行 命令:  cd zookeeper-3.4.6/bin./zkServer.sh start 检查是否安装成功 命令:./zkCli.sh 测试 zk命令create,get,delete等 创建  create /**  "aa"(后面必须带有"/") 获取 通过get /命令 删除  通过delete /命令

April 23, 2019 · 1 min · jiezi

duubo报错:一个NoClassDefFoundError:factories/SerializerFactory问题解决

duubo编译错误 NoClassDefFoundError,factories/SerializerFactory说一个挺有意思的解释dubbo关系,dubbo分为服务者和消费者,服务者比作司机,消费者比作乘客,zookeeper比作滴滴APP,双方之间的建立关系都在这个APP体现)说正题了,昨天写了一个dubbo 提供的接口报错,百思不得其解。第一眼感觉是序列化的问题,实际已经加上了:1.序列化的问题,缺少Serializable(bean实现Serializable接口即可)Serialized class com.yykj.mall.dto.ProductListItemDTO must implement java.io.Serializable报错信息如下:Caused by: java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/factories/SerializerFactoryat com.alibaba.dubbo.common.serialize.support.kryo.KryoFactory.createKryo(KryoFactory.java:74)at com.alibaba.dubbo.common.serialize.support.kryo.PooledKryoFactory.getKryo(PooledKryoFactory.java:43)at com.alibaba.dubbo.common.serialize.support.kryo.KryoObjectOutput.<init>(KryoObjectOutput.java:31)at com.alibaba.dubbo.common.serialize.support.kryo.KryoSerialization.serialize(KryoSerialization.java:43)at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(ExchangeCodec.java:240)at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(ExchangeCodec.java:76)at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec.encode(DubboCountCodec.java:39)at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalEncoder.encode(NettyCodecAdapter.java:81)at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66)at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)at org.jboss.netty.channel.SimpleChannelHandler.writeRequested(SimpleChannelHandler.java:292)at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99)at org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:254)at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)at org.jboss.netty.channel.Channels.write(Channels.java:704)at org.jboss.netty.channel.Channels.write(Channels.java:671)at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:348)at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98)… 52 moreCaused by: java.lang.ClassNotFoundException: com.esotericsoftware.kryo.factories.SerializerFactoryat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)… 73 more出现问题,总是感觉都配对了,很完美,嗯嗯嗯,但是就是一直这个错,感觉是一个心病一样缠绕。。。。所以,想一想列出几项错误:1.配置问题2.接口调用(使用命令)3.使用工具验证 (dubbo-admin)1.首先保证配置正确(好像是废话,但是我不删)2.使用dubbo-admin注册服务中心,等于zookeeper的可视化界面,服务端和消费端接口正常情况3.使用CMD或win10 PowerShell操作,telnet本地的dubbo端口,telnet 127.0.0.1 20880 回车(找之前的问题截屏,太费劲就手写了)dubbo> lsdubbo> com.dubbo.IDubboService// 你的接口dubbo> ls IDubboService // 查看你的方法是否存在select // 三个方法insertupdate dubbo> invoke select(“哈哈哈”)[{“哈哈哈”,“中国”,“xxx@qq.com”}] // 说明接口能调通,没有问题说了这么多,总之前面的问题还没有解决,从另一方面解决,保证其他项是没有问题的(如配置,接口实现等)想了好久,SerializerFactory感觉还是序列化出问题1.dubbo请求接口正常,说明配置是没有问题的,问题出现在我消费者调用服务者的时候或,我在启动消费者过程中发现这个错,表示转换有问题,不兼容,不匹配——-> 检查版本–查看zookper版本,dubbo引包的版本服务者消费者哦,果然是,总结:版本不兼容,确实是,替换一样的版本就好了,原因在于,项目过多,依赖好多jar不一定是这个版本,所以,保证双方之间版本一致性是很重要的,解决很多调用的时候,或者异常错误不明朗,高版本和低版本差异等,检查版本往往是有效果的,夜深了就说这么多了。

April 16, 2019 · 1 min · jiezi

什么是ZooKeeper?

前言只有光头才能变强。文本已收录至我的GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3y上次写了一篇 什么是消息队列?以后,本来想入门一下Kafka的(装一下环境、看看Kafka一些概念啥的)。后来发现Kafka用到了ZooKeeper,而我又对ZooKeeper不了解,所以想先来学学什么是ZooKeeper,再去看看什么是Kafka。ZooKeeper相信大家已经听过这个词了,不知道大家对他了解多少呢?我第一次听到ZooKeeper的时候是在学Eureka的时候(外行人都能看懂的SpringCloud,错过了血亏!),同样ZooKeeper也可以作为注册中心。后面听到ZooKeeper的时候,是因为ZooKeeper可以作为分布式锁的一种实现。直至在了解Kafka的时候,发现Kafka也需要依赖ZooKeeper。Kafka使用ZooKeeper管理自己的元数据配置。这篇文章来写写我学习ZooKeeper的笔记,如果有错的地方希望大家可以在评论区指出。一、什么是ZooKeeper从上面我们也可以发现,好像哪都有ZooKeeper的身影,那什么是ZooKeeper呢?我们先去官网看看介绍:官网还有另一段话:ZooKeeper: A Distributed Coordination Service for Distributed Applications相比于官网的介绍,我其实更喜欢Wiki中对ZooKeeper的介绍:(留下不懂英语的泪水)我简单概括一下:ZooKeeper主要服务于分布式系统,可以用ZooKeeper来做:统一配置管理、统一命名服务、分布式锁、集群管理。使用分布式系统就无法避免对节点管理的问题(需要实时感知节点的状态、对节点进行统一管理等等),而由于这些问题处理起来可能相对麻烦和提高了系统的复杂性,ZooKeeper作为一个能够通用解决这些问题的中间件就应运而生了。二、为什么ZooKeeper能干这么多?从上面我们可以知道,可以用ZooKeeper来做:统一配置管理、统一命名服务、分布式锁、集群管理。这里我们先不管统一配置管理、统一命名服务、分布式锁、集群管理每个具体的含义(后面会讲)那为什么ZooKeeper可以干那么多事?来看看ZooKeeper究竟是何方神物,在Wiki中其实也有提到:ZooKeeper nodes store their data in a hierarchical name space, much like a file system or a tree) data structureZooKeeper的数据结构,跟Unix文件系统非常类似,可以看做是一颗树,每个节点叫做ZNode。每一个节点可以通过路径来标识,结构图如下:那ZooKeeper这颗"树"有什么特点呢??ZooKeeper的节点我们称之为Znode,Znode分为两种类型:短暂/临时(Ephemeral):当客户端和服务端断开连接后,所创建的Znode(节点)会自动删除持久(Persistent):当客户端和服务端断开连接后,所创建的Znode(节点)不会删除ZooKeeper和Redis一样,也是C/S结构(分成客户端和服务端)2.1 监听器在上面我们已经简单知道了ZooKeeper的数据结构了,ZooKeeper还配合了监听器才能够做那么多事的。常见的监听场景有以下两项:监听Znode节点的数据变化监听子节点的增减变化没错,通过监听+Znode节点(持久/短暂[临时]),ZooKeeper就可以玩出这么多花样了。三、ZooKeeper是怎么做到的?下面我们来看看用ZooKeeper怎么来做:统一配置管理、统一命名服务、分布式锁、集群管理。3.1 统一配置管理比如我们现在有三个系统A、B、C,他们有三份配置,分别是ASystem.yml、BSystem.yml、CSystem.yml,然后,这三份配置又非常类似,很多的配置项几乎都一样。此时,如果我们要改变其中一份配置项的信息,很可能其他两份都要改。并且,改变了配置项的信息很可能就要重启系统于是,我们希望把ASystem.yml、BSystem.yml、CSystem.yml相同的配置项抽取出来成一份公用的配置common.yml,并且即便common.yml改了,也不需要系统A、B、C重启。做法:我们可以将common.yml这份配置放在ZooKeeper的Znode节点中,系统A、B、C监听着这个Znode节点有无变更,如果变更了,及时响应。参考资料:基于zookeeper实现统一配置管理https://blog.csdn.net/u011320740/article/details/787426253.2 统一命名服务统一命名服务的理解其实跟域名一样,是我们为这某一部分的资源给它取一个名字,别人通过这个名字就可以拿到对应的资源。比如说,现在我有一个域名www.java3y.com,但我这个域名下有多台机器:192.168.1.1192.168.1.2192.168.1.3192.168.1.4别人访问www.java3y.com即可访问到我的机器,而不是通过IP去访问。3.3 分布式锁锁的概念在这我就不说了,如果对锁概念还不太了解的同学,可参考下面的文章Java锁?分布式锁?乐观锁?行锁?我们可以使用ZooKeeper来实现分布式锁,那是怎么做的呢??下面来看看:系统A、B、C都去访问/locks节点访问的时候会创建带顺序号的临时/短暂(EPHEMERAL_SEQUENTIAL)节点,比如,系统A创建了id_000000节点,系统B创建了id_000002节点,系统C创建了id_000001节点。接着,拿到/locks节点下的所有子节点(id_000000,id_000001,id_000002),判断自己创建的是不是最小的那个节点如果是,则拿到锁。释放锁:执行完操作后,把创建的节点给删掉如果不是,则监听比自己要小1的节点变化举个例子:系统A拿到/locks节点下的所有子节点,经过比较,发现自己(id_000000),是所有子节点最小的。所以得到锁系统B拿到/locks节点下的所有子节点,经过比较,发现自己(id_000002),不是所有子节点最小的。所以监听比自己小1的节点id_000001的状态系统C拿到/locks节点下的所有子节点,经过比较,发现自己(id_000001),不是所有子节点最小的。所以监听比自己小1的节点id_000000的状态……等到系统A执行完操作以后,将自己创建的节点删除(id_000000)。通过监听,系统C发现id_000000节点已经删除了,发现自己已经是最小的节点了,于是顺利拿到锁….系统B如上3.4集群状态经过上面几个例子,我相信大家也很容易想到ZooKeeper是怎么"感知"节点的动态新增或者删除的了。还是以我们三个系统A、B、C为例,在ZooKeeper中创建临时节点即可:只要系统A挂了,那/groupMember/A这个节点就会删除,通过监听groupMember下的子节点,系统B和C就能够感知到系统A已经挂了。(新增也是同理)除了能够感知节点的上下线变化,ZooKeeper还可以实现动态选举Master的功能。(如果集群是主从架构模式下)原理也很简单,如果想要实现动态选举Master的功能,Znode节点的类型是带顺序号的临时节点(EPHEMERAL_SEQUENTIAL)就好了。Zookeeper会每次选举最小编号的作为Master,如果Master挂了,自然对应的Znode节点就会删除。然后让新的最小编号作为Master,这样就可以实现动态选举的功能了。最后这篇文章主要讲解了ZooKeeper的入门相关的知识,ZooKeeper通过Znode的节点类型+监听机制就实现那么多好用的功能了!当然了,ZooKeeper要考虑的事没那么简单的,后面有机会深入的话,我还会继续分享,希望这篇文章对大家有所帮助~参考资料:分布式服务框架 Zookeeperhttps://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/index.htmlZooKeeper初识整理(老酒装新瓶)https://lxkaka.wang/2017/12/21/zookeeper/ZooKeeperhttps://www.cnblogs.com/sunshine-long/p/9057191.htmlZooKeeper 的应用场景https://zhuanlan.zhihu.com/p/59669985乐于输出干货的Java技术公众号:Java3y。公众号内有200多篇原创技术文章、海量视频资源、精美脑图,不妨来关注一下!觉得我的文章写得不错,不妨点一下赞!

April 15, 2019 · 1 min · jiezi

RPC架构之SOA服务化架构学习(一)

传统垂直应用架构背景:传统垂直MVC项目简单分为展示层.业务逻辑层.数据访问层缺点:如1.复杂应用的开发维护成本变高,部署效率逐渐降低 2.团队协作效率差,部分公共功能重复开发,代码重复率居高不下 3.系统可靠性变差。随着业务的发展,访问量逐渐攀升,网络流量、负载均衡、数据库连接等都面临着巨大的压力.走向:当垂直引用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。同时将公共能力API抽取出来,作为独立的公共服务供其他调用者消费,以实现服务的共享和重用,降低开发和运维成本。应用拆分之后会按照模块独立部署,接口调用由本地API演进成跨进程的远程方法调用,此时RPC框架应运而生具体可参考《分布式服务框架原理与实践》集群管理,负载均衡负载均衡有F5硬件负载均衡和软负载均衡.这里我简单讲下软负载均衡,nginx的反向代理服务很好的实现了集群管理,负载均衡.反向代理就是根据客户端的请求,从其关系的一组或多组后端服务器上获取资源,然后再将这些资源返回给客户端,客户端只会得知反向代理的IP地址,而不知道在代理服务器后面的服务器簇的存在.session失效: nginx默认算法是轮询服务器,这有一个问题session会失效解决办法1:upstream里设置ip_hash即采用哈希算法则可以解决这个问题,某个用户请求了A服务器,接下来该用户只会请求A服务器,除非A挂了,则会请求转入别的服务器,这时候还是会存在session失效的问题.解决办法2:session共享,比如2个tomcat来说,session共享需要发生网络通信也就是会建立连接,如果集群有多个,多个请求同时到每个不同tomcat,比如100个请求到100个不同tomcat,则会把100个的session共享到另外99个tomcat,则此时连接就100了,集群越多性能反而大大降低了.因此nginx自身session共享不建议,轮询算法中可通过别的方法,如redis共享session.初步学习分布式,理解较为浅,后续还会改动~~~

April 10, 2019 · 1 min · jiezi

女朋友也能看懂的Zookeeper分布式锁原理

前言关于分布式锁,在互联网行业的使用场景还是比较多的,比如电商的库存扣减,秒杀活动,集群定时任务执行等需要进程互斥的场景。而实现分布式锁的手段也很多,大家比较常见的就是redis跟zookeeper,今天我们主要介绍的是基于zookeeper实现的分布式锁。这篇文章主要借用Curator框架对zk分布式锁的实现思路,大家理解了以后完全可以自己手动实现一遍,但是在工作中还是建议使用成熟的开源框架,很多坑别人已经帮我们踩好了,除非万不得已,需要高度定制符合自己项目的需求的时候,才开始自行封装吧。正文zookeeper简单介绍既然是基于zookeeper的分布式锁,首先肯定要对这个zookeeper有一定了解,这里就不过多的进行讲解,只对其跟分布式锁有关联的特性做一个简单的介绍,更多详细的功能特性大家可以参阅官方文档。zookeeper维护着类似文件系统的数据结构,它总共有四种类型的节点PERSISTENT:持久化的节点。一旦创建后,即使客户端与zk断开了连接,该节点依然存在。PERSISTENT_SEQUENTIAL:持久化顺序编号节点。比PERSISTENT节点多了节点自动按照顺序编号。EPHEMERAL:临时节点。当客户端与zk断开连接之后,该节点就被删除。EPHEMERAL_SEQUENTIAL:临时顺序编号节点。比EPHEMERAL节点多了节点自动按照顺序编号。(分布式锁实现使用该节点类型)Curator实现分布式锁原理好,当我们简单了解了zk的节点类型以后,现在正式的分析Curator分布式锁的实现原理。这里我们定义了一个“/curator_lock”锁节点用来存放相关客户端创建的临时顺序节点。 假设两个客户端ClientA跟ClientB同时去争夺一个锁,此时ClientA先行一步获得了锁,那么它将会在我们的zk上创建一个“/curator_lock/xxxxx-0000000000”的临时顺序节点。接着它会拿到“/curator_lock/”锁节点下的所有子节点,因为这些节点是有序的,这时候会判断它所创建的节点是否排在第一位(也就是序号最小),由于ClientA是第一个创建节点的的客户端,必然是排在第一位,所以它也就拿到了锁。[zk: localhost:2182(CONNECTED) 4] ls /curator_lock[_c_f3f38067-8bff-47ef-9628-e638cfaad77e-lock-0000000000]这个时候ClientB也来了,按照同样的步骤,先是在“/curator_lock/”下创建一个临时顺序节点“/curator_lock/xxxxx-0000000001”,接着也是获得该节点下的所有子节点信息,并比对自己生成的节点序号是否最小,由于此时序号最小的节点是ClientA创建的,并且还没释放掉,所以ClientB自己就拿不到锁。[zk: localhost:2182(CONNECTED) 4] ls /curator_lock[_c_2a8198e4-2039-4a3c-8606-39c65790d637-lock-0000000001,_c_f3f38067-8bff-47ef-9628-e638cfaad77e-lock-0000000000]既然ClientB拿不到锁,也不会放弃,它会对自己的前一个节点加上监听器(zk提供的api实现),只要监听到前一个节点被删除了,也就是释放了锁,就会马上重新执行获取锁的操作。当后面的ClientC,ClientD…过来的时候也是如此,变化的只是节点上的编号,它会根据Client连接的数量而不断增加。可能大家还会担心,万一我的获取到锁的客户端宕机了怎么办,会不会不释放锁?其实上面已经解答了这个问题,由于Curator使用的是临时顺序节点来实现的分布式锁,只要客户端与zk连接断开,该节点也就消失了,相当于释放了锁。下面代码展示了Curator的基本使用方法,仅作为参考实例,请勿在生产环境使用的这么随意。CuratorFramework client = CuratorFrameworkFactory.newClient(“127.0.0.1:2182”, 5000,10000, new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessMutex interProcessMutex = new InterProcessMutex(client, “/curator_lock”); //加锁 interProcessMutex.acquire(); //业务逻辑 //释放锁 interProcessMutex.release(); client.close(); 总结我们在搞懂了原理之后,就可以抛弃Curator,自己动手实现一个分布式锁了,相信大家实现基本的功能都是没问题的,但是要做到生产级别,可能还是要在细节上下功夫,比如说一些异常处理,性能优化等因素。微信公众号《深夜里的程序猿》 - 分享最干的干货

April 10, 2019 · 1 min · jiezi

dubbo设置连接zookeeper权限

前言关于zookeeper知之甚少,少之又少,只是作为dubbo的注册中心连接,某天某检测机构随手一扫然后说你们zookeeper 没有设置任何安全验证,当时就懵了,还有这种操作。zookeeper设置ACL 权限查阅dubbo的官方文档dubbo-registry发现连接注册中心的时候是可以选择是否需要用户名密码,接下来就是要如何设置zookeeper的用户名跟密码进入zookeeper的bin文件夹运行客户端./zkCli.sh-help 查看指令[zk: localhost:2181(CONNECTED) 0] -helpZooKeeper -server host:port cmd args stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port如果在dubbo中没有指定分组的话,dubbo会默认生成一个分组dubbo,也就是在zookeeper下面会有个子节点dubbo也可以自己手动创建create /dubboZookeeper的ACL通过scheme🆔permissions来构成权限scheme这边主要用到2种方式,另外还有设置ip和host,这几个没用到的这边就先不细说1.auth方式(密码明文)添加用户名和密码addauth digest onepay:onepay授予/dubbo auth权限setAcl /dubbo auth:onepay:onepay:rwadc配置dubbo连接zookeeper配置文件<dubbo:registry protocol =“zookeeper” address=“127.0.0.1:2181” username=“onepay” password=“onepay” client=“curator” />2.digest授权方式(方式跟auth差不多)授予/dubbo digest权限setAcl /dubbo digest:onepay:T+17ezPAW0kDvN6elPD5Tdzdm00=:cdrwaaddauth digest onepay:onepay配置zookeeper配置文件<dubbo:registry protocol =“zookeeper” address=“127.0.0.1:2181” username=“onepay” password=“onepay” client=“curator” />digest 密码生成方式:把密码进行sha1编码然后对结果进行base64编码BASE64(SHA1(password))查看zookeeper源码发现,其实包里面已经有现成的方法,直接调用这个类生成就行,idPassword字符串格式: username:passwordorg.apache.zookeeper.server.auth.DigestAuthenticationProvider static public String generateDigest(String idPassword) throws NoSuchAlgorithmException { String parts[] = idPassword.split(":", 2); byte digest[] = MessageDigest.getInstance(“SHA1”).digest( idPassword.getBytes()); return parts[0] + “:” + base64Encode(digest); }还有一个点就是要设置client=“curator"通过ZookeeperRegistry发现zookeeper的连接是通过zookeeperTransporter进行创建,zookeeperTransporter接口分别由CuratorZookeeperTransporterZkclientZookeeperTransporter实现,这2个分别创建CuratorZookeeperClient和ZkclientZookeeperClientpublic class ZkclientZookeeperTransporter implements ZookeeperTransporter { public ZookeeperClient connect(URL url) { return new ZkclientZookeeperClient(url); }}public class CuratorZookeeperTransporter implements ZookeeperTransporter { public ZookeeperClient connect(URL url) { return new CuratorZookeeperClient(url); }}查看源码发现ZkclientZookeeperClient是没有进行设置zookeeper的auth的账号和密码,CuratorZookeeperClient有去获取配置的相关用户信息。 public ZkclientZookeeperClient(URL url) { super(url); client = new ZkClient(url.getBackupAddress()); client.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; if (state == KeeperState.Disconnected) { stateChanged(StateListener.DISCONNECTED); } else if (state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } } public void handleNewSession() throws Exception { stateChanged(StateListener.RECONNECTED); } }); } public CuratorZookeeperClient(URL url) { super(url); try { Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization(“digest”, authority.getBytes()); } client = builder.build(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); client.start(); } catch (IOException e) { throw new IllegalStateException(e.getMessage(), e); } }cdrwa表示zookeeper的五种权限CREATE: 创建子节点READ: 获取节点数据或者当前节点的子节点列表WRITE: 节点设置数据DELETE: 删除子节点ADMIN: 节点设置权限如果用户名密码错误,或者没设置,会报KeeperErrorCode = NoAuth错误注:停止zookeeper,清除zookeeper文件夹下面的logs,或者用delete 删除节点 就可以清除权限以上参考文档Apache Zookeeper Setting ACL ...

April 3, 2019 · 2 min · jiezi

zookeeper伪集群搭建及遇到的坑

今天搭建了zookeeper的单机伪集群,记录一下防止忘记安装从官网下载安装包解压到本地目录,比如D:/zookeeper-3.4.10配置为了运行3个 zookeeper 服务端进程,新建存放这3个进程运行和配置数据的目录,比如叫 z1、z2、z3 。在这3个目录下都建一个 data 目录用于存放进程运行时的数据,接着在这3个目录都新建一个叫 myid 的文件,内容分别为1、2、3(即 z1 下 myid 的内容是1、z2 下 myid 的内容是2、z3 下 myid 的内容是3),最后在这3个目录下都建一个 .cfg 结尾的配置文件。z1 目录下的配置文件叫 z1.cfg,内容如下:tickTime=2000initLimit=10syncLimit=5dataDir=$(z1所在目录的全路径)/dataclientPort=2981server.1=127.0.0.1:2222:2223server.2=127.0.0.1:3333:3334server.3=127.0.0.1:4444:4445z2 目录下的配置文件叫 z2.cfg,内容如下:tickTime=2000initLimit=10syncLimit=5dataDir=$(z2所在目录的全路径)/dataclientPort=2982server.1=127.0.0.1:2222:2223server.2=127.0.0.1:3333:3334server.3=127.0.0.1:4444:4445z3 目录下的配置文件叫 z3.cfg,内容如下:tickTime=2000initLimit=10syncLimit=5dataDir=$(z3所在目录的全路径)/dataclientPort=2983server.1=127.0.0.1:2222:2223server.2=127.0.0.1:3333:3334server.3=127.0.0.1:4444:4445tickTime:服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每隔 tickTime 时间就会发送一个心跳,以毫秒为单位。也是 zookeeper 中的时间单元,zookeeper 中所有时间都是以这个时间单元为基础,进行整数倍配置的。例如,session 的最小超时时间是 2*tickTime 。initLimit 和 syncLimit:都是表示连接的心跳数,具体含义暂时可以不用管。dataDir:zookeeper 保存数据的目录,默认情况下 zookeeper 写数据的日志文件也保存在这个目录里。clientPort:客户端连接服务器的端口,zookeeper 会监听这个端口,接受客户端的访问请求。server.N:XXXX:P1:P2 。其中 N 表示服务器编号,XXXX 表示该服务器的 IP 地址,P1 和 P2 是两个 TCP 端口号,分别用于仲裁和 Learder 选举。服务器编号也对应着上面配置的 myid 文件的内容,比如上面 z1 目录下的 myid 内容是 1 ,也就是这里的 server.启动分别启动3个 zookeeper 进程,启动时使用上面新建的配置文件启动 z1 :sh $(zookeeper压缩包解压后的全路径)/bin/zkServer.sh start $(z1所在目录的全路径)/z1.cfg启动 z2 :sh $(zookeeper压缩包解压后的全路径)/bin/zkServer.sh start $(z2所在目录的全路径)/z2.cfg启动 z3 :sh $(zookeeper压缩包解压后的全路径)/bin/zkServer.sh start $(z3所在目录的全路径)/z3.cfg当看到如下信息表示 zookeeper 的进程启动好了坑在启动的过程中遇到一个坑,在启动好后用sh $(zookeeper压缩包解压后的全路径)/bin/zkServer.sh status $(z2所在目录的全路径)/z2.cfg报错如下解决过程如下:使用./zkServer.sh start-foreground /mnt/d/zkData/z1/zoo.cfg使zookeeper前台运行,抛如下异常但是我已经写了myid文件了,怎么找不到呢?其实是这样,我在windows10的linux sub system里运行的,不能用windows里的路径格式,要改为这样的格式/mnt/d/{zookeeper myid路径},这个坑是windows的坑啊。。。验证用telnet连接客户端端口,如下说明成功 ...

March 19, 2019 · 1 min · jiezi

Kafka - Zookeeper/Kafka集群的部署

下载kafka,自带 zookeeper。搭建Zookeeper集群zookeeper 集群使用 Raft 选举模式,故至少要三个节点(生产中应部署在三个不同的服务器实例上,这里用于演示就不那么做了)。# 复制三分节点配置cp config/zookeeper.properties config/zookeeper.2181.propertiescp config/zookeeper.properties config/zookeeper.2182.propertiescp config/zookeeper.properties config/zookeeper.2183.properties修改配置config/zookeeper.2181.properties# the directory where the snapshot is stored.dataDir=/tmp/zookeeper/2181# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0tickTime=2000initLimit=10syncLimit=5server.1=localhost:12888:13888server.2=localhost:22888:23888server.3=localhost:32888:33888config/zookeeper.2182.properties 修改clientPort=2182 dataDir=/tmp/zookeeper/2182 其他一致config/zookeeper.2183.properties 修改clientPort=2183 dataDir=/tmp/zookeeper/2183 其他一致主要是修改服务端口clientPort和数据目录dataDir,其他参数表征如下:tickTime=2000为zk的基本时间单元,毫秒initLimit=10Leader-Follower初始通信时限(tickTime10)syncLimit=5Leader-Follower同步通信时限(tickTime5)server.实例集群标识=实例地址:数据通信端口:选举通信端口为实例添加集群标识echo 1 >> /tmp/zookeeper/2181/myidecho 2 >> /tmp/zookeeper/2182/myidecho 3 >> /tmp/zookeeper/2183/myid启动集群服务bin/zookeeper-server-start.sh config/zookeeper.2181.propertiesbin/zookeeper-server-start.sh config/zookeeper.2182.propertiesbin/zookeeper-server-start.sh config/zookeeper.2183.properties搭建Kafka集群Kafka集群节点>=2时便可对外提供高可用服务cp config/server.properties config/server.9092.propertiescp config/server.properties config/server.9093.properties修改节点标识、服务端口、数据目录和zk集群节点列表vi config/server.9092.propertiesbroker.id=1…listeners=PLAINTEXT://:9092…log.dirs=/tmp/kafka-logs/1…zookeeper.connect=localhost:2181,localhost:2182,localhost:2183vi config/server.9093.propertiesbroker.id=2…listeners=PLAINTEXT://:9093…log.dirs=/tmp/kafka-logs/2…zookeeper.connect=localhost:2181,localhost:2182,localhost:2183启动集群bin/kafka-server-start.sh config/server.9092.propertiesbin/kafka-server-start.sh config/server.9093.propertiesTopic管理创建topicbin/kafka-topics.sh –create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 2 --partition 4 --topic topic_1–replication-factor 2:副本集数量,不能大于 broker 节点数量,多了也没用,1个节点放>=2个副本挂了都完蛋。–partition 4:分区数查看topic列表bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 –listtopic_1topic_2查看Topic详情可以描述Topic分区数/副本数/副本Leader/副本ISR等信息:bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --describe –topic topic_1Topic:topic_1 PartitionCount:4 ReplicationFactor:2 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: topic_1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2删除Topic注意,只是删除Topic在zk的元数据,日志数据仍需手动删除。bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --delete –topic topic_2#Topic topic_2 is marked for deletion.#Note: This will have no impact if delete.topic.enable is not set to true.#再查看topic列表bin/kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 –list#topic_1#topic_2 - marked for deletion生产者bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic topic_1# 进入 cli 输入消息回车发送# hello kafka [enter]# send message [enter]消费者新模式,offset存储在borker–new-consumer Use new consumer. This is the default.–bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to.老消费模式,offset存储在zk–zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.创建消费者bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --from-beginning --topic topic_1可以尝试创建多个不同消费组的消费者(这里的sh脚本创建的都是不同消费组的),订阅同一个topic来实现发布订阅模式。查看消费组/消费者bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --list#这里有两个消费组的消费者console-consumer-47566console-consumer-50875查看消费详情可以查看到消费的订阅的 topic,负责的 partition,消费进度 offset, 积压的消息LAG。bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092,localhost:9093 --group console-consumer-47566 --describeGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERconsole-consumer-47566 topic_1 0 2 2 0 consumer-1_/127.0.0.1console-consumer-47566 topic_1 1 3 3 0 consumer-1_/127.0.0.1console-consumer-47566 topic_1 2 2 3 1 consumer-1_/127.0.0.1console-consumer-47566 topic_1 3 0 3 3 consumer-1_/127.0.0.1 ...

March 18, 2019 · 2 min · jiezi

zookeeper2

数据发布订阅/ 配置中心实现配置信息的集中式管理和数据的动态更新实现配置中心有两种模式:push 、pull。长轮训zookeeper采用的是推拉相结合的方式。 客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,那么服务器端就会向客户端发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据数据量比较小数据内容在运行时会发生动态变更集群中的各个机器共享配置负载均衡请求/数据分摊多个计算机单元上分布式锁通常实现分布式锁有几种方式redis。 setNX 存在则会返回0, 不存在数据方式去实现创建一个表, 通过索引唯一的方式create table (id , methodname …) methodname增加唯一索引insert 一条数据XXX delete 语句删除这条记录mysql for update 行锁,杜占锁zookeeper实现排他锁利用路径唯一共享锁(读锁) locks当中是有序节点,控制使用权限,每一个客户端写一个节点之后,获取到最小节点,获取数据,有写的操作,优先处理写的节点,利用节点特性实现共享锁,使用java api的方式package zk.lock;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.List;import java.util.Random;import java.util.SortedSet;import java.util.TreeSet;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class DistributeLock { //根节点 private static final String ROOT_LOCKS = “/LOCKS”; private ZooKeeper zooKeeper; //节点的数据 private static final byte[] data = {1, 2}; //会话超时时间 private int sessionTimeOut; //记录锁节点id private String lockID; private CountDownLatch countDownLatch = new CountDownLatch(1); public DistributeLock() throws IOException, InterruptedException { this.zooKeeper = ZookeeperClient.getInstance(); this.sessionTimeOut = ZookeeperClient.getSESSIONTIMEOUT(); } /** * 获取锁的方法 * * @return / public synchronized boolean lock() { try { //LOCKS/000001 lockID = zooKeeper.create(ROOT_LOCKS + “/”, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + “->” + “成功创建了lock节点[” + lockID + “,开始竞争锁]”); //获取根节点下的所有子节点 List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true); // 排序,从小到大 TreeSet<String> sortedSet = new TreeSet<>(); for (String children : childrenNodes) { sortedSet.add(ROOT_LOCKS + “/” + children); } //获取到最小的节点 String first = sortedSet.first(); if (lockID.equals(first)) { //表示当前就是最小的节点 System.out.println(Thread.currentThread().getName() + “–>成功获得锁,locak节点为:【” + lockID + “]”); return true; } SortedSet<String> lessThanLockId = sortedSet.headSet(lockID); if (!lessThanLockId.isEmpty()) { //获取到比当前LockId这个节点更小的上一个节点 String prevLockId = lessThanLockId.last(); //监控上一个节点 zooKeeper.exists(prevLockId, new LockWatcher(countDownLatch)); //如果会话超时或者节点被删除(释放)了 countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS); System.out.println(Thread.currentThread().getName() + “成功获取锁:【” + lockID + “】”); return true; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public synchronized boolean unLock() { System.out.println(Thread.currentThread().getName() + “–>开始释放锁”); try { zooKeeper.delete(lockID, -1); System.out.println(“节点” + lockID + “被释放了”); return true; } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } return false; } public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(10); Random random = new Random(); for (int i = 0; i < 10; i++) { new Thread(() -> { DistributeLock lock = null; try { lock = new DistributeLock(); countDownLatch.countDown(); countDownLatch.await(); lock.lock(); Thread.sleep(random.nextInt(500)); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock != null) { lock.unLock(); } } }).start(); } }}package zk.lock;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import java.util.concurrent.CountDownLatch;public class LockWatcher implements Watcher { private CountDownLatch countDownLatch; public LockWatcher(CountDownLatch latch) { this.countDownLatch = latch; } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDataChanged) { countDownLatch.countDown(); } }}命名服务master选举 724小时可用, 99.999%可用master-slave模式 slave监听master节点,如果master节点挂掉,slave自动接替master,心跳机制维持,出现网络异常,slave认为master挂掉了,可能出现双主节点的情况,重复处理数据使用zookeeper解决上述问题,往某一个节点注册master节点,只有一个能够注册成功,注册成功的为master,如果失去联系,节点会被删除,不会出现脑裂问题 per实现原理讲解分布式队列master选举改成多线程(多进程)模型(master-slave) 创建三个工程,while去抢分布式队列activeMQ、kafka、….先进先出队列通过getChildren获取指定根节点下的所有子节点,子节点就是任务确定自己节点在子节点中的顺序如果自己不是最小的子节点,那么监控比自己小的上一个子节点,否则处于等待接收watcher通知,重复流程Barrier模式 在一个节点下只能允许多少数据,只有子节点达到一定数量,才执行 curator 提供应用场景的封装 curator-reciples master/leader选举 分布式锁(读锁、写锁)分布式队列LeaderLatch写一个master LeaderSelector每一个应用都写一个临时有序节点,根据最小的节点来获得优先权 curator 提供应用场景的封装 curator-reciples master/leader选举 分布式锁(读锁、写锁)分布式队列…LeaderLatch 写一个master LeaderSelector 每一个应用都写一个临时有序节点,根据最小的节点来获得优先权zookeeper集群角色leader leader是zookeeper集群的核心。事务请求的唯一调度者和处理者,保证集群事务处理的顺序性集群内部各个服务器的调度者follower处理客户端非事务请求,以及转发事务请求给leader服务器参与事务请求提议(proposal)的投票(客户端的一个事务请求,需要半数服务器投票通过以后才能通知leader commit; leader会发起一个提案,要求follower投票)参与leader选举的投票observer 观察zookeeper集群中最新状态的变化并将这些状态同步到observer服务器上。 增加observer不影响集群中事务处理能力,同时还能提升集群的非事务处理能力zookeeper的集群组成 zookeeper一般是由 2n+1台服务器组成leader选举选举算法: leaderElection AuthFastLeaderElection FastLeaderElectionQuorumPeer startLeaderElection源码地址:https://github.com/apache/zoo…需要的条件: jdk 1.7以上 、ant 、ideaFastLeaderElectionserverid : 在配置server集群的时候,给定服务器的标识id(myid)zxid : 服务器在运行时产生的数据ID, zxid的值越大,表示数据越新Epoch: 选举的轮数server的状态:Looking、 Following、Observering、Leading 第一次初始化启动的时候: LOOKING所有在集群中的server都会推荐自己为leader,然后把(myid、zxid、epoch)作为广播信息,广播给集群中的其他server, 然后等待其他服务器返回每个服务器都会接收来自集群中的其他服务器的投票。集群中的每个服务器在接受到投票后,开始判断投票的有效性a) 判断逻辑时钟(Epoch) ,如果Epoch大于自己当前的Epoch,说明自己保存的Epoch是过期。更新Epoch,同时clear其他服务器发送过来的选举数据。判断是否需要更新当前自己的选举情况b) 如果Epoch小于目前的Epoch,说明对方的epoch过期了,也就意味着对方服务器的选举轮数是过期的。这个时候,只需要讲自己的信息发送给对方c) 如果sid等于当前sid,根据规则来判断是否有资格获得leader 接受到来自其他服务器的投票后,针对每一个投票,都需要将别人的投票和自己的投票进行对比,zxid,zxid最大的服务器优先统计投票ZAB协议拜占庭问题一组拜占庭将军分别各率领一支军队共同围困一座城市。为了简化问题,将各支军队的行动策略限定为进攻或撤离两种。因为部分军队进攻部分军队撤离可能会造成灾难性后果,因此各位将军必须通过投票来达成一致策略,即所有军队一起进攻或所有军队一起撤离。因为各位将军分处城市不同方向,他们只能通过信使互相联系。在投票过程中每位将军都将自己投票给进攻还是撤退的信息通过信使分别通知其他所有将军,这样一来每位将军根据自己的投票和其他所有将军送来的信息就可以知道共同的投票结果而决定行动策略。系统的问题在于,将军中可能出现叛徒,他们不仅可能向较为糟糕的策略投票,还可能选择性地发送投票信息。假设有9位将军投票,其中1名叛徒。8名忠诚的将军中出现了4人投进攻,4人投撤离的情况。这时候叛徒可能故意给4名投进攻的将领送信表示投票进攻,而给4名投撤离的将领送信表示投撤离。这样一来在4名投进攻的将领看来,投票结果是5人投进攻,从而发起进攻;而在4名投撤离的将军看来则是5人投撤离。这样各支军队的一致协同就遭到了破坏。由于将军之间需要通过信使通讯,叛变将军可能通过伪造信件来以其他将军的身份发送假投票。而即使在保证所有将军忠诚的情况下,也不能排除信使被敌人截杀,甚至被敌人间谍替换等情况。因此很难通过保证人员可靠性及通讯可靠性来解决问题。假始那些忠诚(或是没有出错)的将军仍然能通过多数决定来决定他们的战略,便称达到了拜占庭容错。在此,票都会有一个默认值,若消息(票)没有被收到,则使用此默认值来投票。上述的故事映射到计算机系统里,将军便成了计算机,而信差就是通信系统。虽然上述的问题涉及了电子化的决策支持与信息安全,却没办法单纯的用密码学与数字签名来解决。因为电路错误仍可能影响整个加密过程,这不是密码学与数字签名算法在解决的问题。因此计算机就有可能将错误的结果提交去,亦可能导致错误的决策。 paxos协议主要就是如何保证在分布式环网络环境下,各个服务器如何达成一致最终保证数据的一致性问题ZAB协议,基于paxos协议的一个改进。zab协议为分布式协调服务zookeeper专门设计的一种支持崩溃恢复的原子广播协议zookeeper并没有完全采用paxos算法, 而是采用zab Zookeeper atomic broadcastzab协议的原理在zookeeper 的主备模式下,通过zab协议来保证集群中各个副本数据的一致性zookeeper使用的是单一的主进程来接收并处理所有的事务请求,并采用zab协议,把数据的状态变更以事务请求的形式广播到其他的节点zab协议在主备模型架构中,保证了同一时刻只能有一个主进程来广播服务器的状态变更所有的事务请求必须由全局唯一的服务器来协调处理,这个的服务器叫leader,其他的叫followerleader节点主要负责把客户端的事务请求转化成一个事务提议(proposal),并分发给集群中的所有follower节点再等待所有follower节点的反馈。一旦超过半数服务器进行了正确的反馈,那么leader就会commit这条消息崩溃恢复原子广播zab协议的工作原理什么情况下zab协议会进入崩溃恢复模式(没有接受到的数据进行同步)当服务器启动时当leader服务器出现网络中断、崩溃或者重启的情况集群中已经不存在过半的服务器与该leader保持正常通信zab协议进入崩溃恢复模式会做什么当leader出现问题,zab协议进入崩溃恢复模式,并且选举出新的leader。当新的leader选举出来以后,如果集群中已经有过半机器完成了leader服务器的状态同(数据同步),退出崩溃恢复,进入消息广播模式当新的机器加入到集群中的时候,如果已经存在leader服务器,那么新加入的服务器就会自觉进入数据恢复模式,找到leader进行数据同步问题 假设一个事务在leader服务器被提交了,并且已经有过半的follower返回了ack。 在leader节点把commit消息发送给follower机器之前leader服务器挂了怎么办 zab协议,一定需要保证已经被leader提交的事务也能够被所有follower提交zab协议需要保证,在崩溃恢复过程中跳过哪些已经被丢弃的事务回顾zookeeper数据模型 临时节点(有序)、 持久化节点(有序) 临时节点其他节点也能看到zookeeper是一个开源的分布式协调框架; 数据发布订阅、负载均衡、集群、master选举。。。原子性: 要么同时成功、要么同时失败 (分布式事务)单一视图: 无论客户端连接到哪个服务器,所看到的模型都是一样可靠性:一旦服务器端提交了一个事务并且获得了服务器端返回成功的标识,那么这个事务所引起的服务器端的变更会一直保留实时性: 近实时zookeeper并不是用来存储数据的,通过监控数据状态的变化,达到基于数据的集群管理。分布式集群配置修改zoo.cfgserver.id=ip:port:port 第一个Port 数据同步通信、 第二个port :leader选举(3181)id=myid (myid 参与leader选举、 在整个集群中表示唯一服务器的标识)dataDir目录下 创建一个myid的文件 , 内容: server.id对应当前服务器的id号如果增加observer 需要在第一步中, server.id=ip:port:port:observer ; peerType=observer会话NOT_CONNECTED - > CONNECTING ->CONNECTED ->ClOSE数据模型数据模型是一个树形结构,最小的数据单元是ZNODE临时节点和持久化节点临时有序节点持久化有序节点状态信息StatcZxid = 0xb0000000f ctime = Sun Aug 13 20:24:03 CST 2017mZxid = 0xb0000000fmtime = Sun Aug 13 20:24:03 CST 2017pZxid = 0xb0000000fcversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x15dda30f72f0000dataLength = 2numChildren = 0zab协议 : 如果客户端发了一个事务请求给到leader, 而leader发送给各个follower以后,并且收到了ack,leader已经commit。 在准备ack给各个follower节点comit的时候,leader挂了,怎么处理的。选举新的leader(zxid的最大值)同步给其他的folowerwatcherEventyTypeNone 客户端与服务器端成功建立会话NodeCreated 节点创建NodeDeleted 节点删除NodeDataChanged 数据变更:数据内容NodeChildrenChanged 子节点发生变更: 子节点删除、新增的时候,才会触发watcher的特性一次性触发: 事件被处理一次后,会被移除,如果需要永久监听,则需要反复注册zkClient ( 永久监听的封装)curator java api的话, zk.exists , zk.getData 创建一个watcher监听zookeeper序列化使用的是JuteAcl权限的操作保证存储在zookeeper上的数据安全性问题schema(ip/Digest/world/super)授权对象(192.168.1.1/11 , root:root / world:anyone/ super)数据存储内存数据和磁盘数据zookeeper会定时把数据存储在磁盘上。DataDir = 存储的是数据的快照快照: 存储某一个时刻全量的内存数据内容DataLogDir 存储事务日志zxid :服务器运行时产生的日志idlog.zxid查看事务日志的命令java -cp :/mic/data/program/zookeeper-3.4.10/lib/slf4j-api-1.6.1.jar:/mic/data/program/zookeeper-3.4.10/zookeeper-3.4.10.jar org.apache.zookeeper.server.LogFormatter log.200000001zookeeper 有三种日志zookeeper.out //运行日志快照 存储某一时刻的全量数据事务日志 事务操作的日志记录 ...

March 8, 2019 · 3 min · jiezi

zookeeper

分布式协调服务-zookeeper分布式环境的特点分布性并发性程序运行过程中,并发性操作是很常见的。比如同一个分布式系统中的多个节点,同时访问一个共享资源。数据库、分布式存储无序性进程之间的消息通信,会出现顺序不一致问题分布式环境下面临的问题网络通信网络本身的不可靠性,因此会涉及到一些网络通信问题网络分区(脑裂)当网络发生异常导致分布式系统中部分节点之间的网络延时不断增大,最终导致组成分布式架构的所有节点,只有部分节点能够正常通信三态在分布式架构里面,成功、失败、超时分布式事务ACID(原子性、一致性、隔离性、持久性)中心化和去中心化冷备或者热备分布式架构里面,很多的架构思想采用的是:当集群发生故障的时候,集群中的人群会自动“选举”出一个新的领导。最典型的是: zookeeper / etcd经典的CAP/BASE理论CAPC(一致性 Consistency): 所有节点上的数据,时刻保持一致可用性(Availability):每个请求都能够收到一个响应,无论响应成功或者失败分区容错 (Partition-tolerance):表示系统出现脑裂以后,可能导致某些server与集群中的其他机器失去联系CP / APCAP理论仅适用于原子读写的Nosql场景,不适用于数据库系统BASE基于CAP理论,CAP理论并不适用于数据库事务(因为更新一些错误的数据而导致数据出现紊乱,无论什么样的数据库高可用方案都是徒劳) ,虽然XA事务可以保证数据库在分布式系统下的ACID特性,但是会带来性能方面的影响;eBay尝试了一种完全不同的套路,放宽了对事务ACID的要求。提出了BASE理论Basically available : 数据库采用分片模式, 把100W的用户数据分布在5个实例上。如果破坏了其中一个实例,仍然可以保证80%的用户可用soft-state: 在基于client-server模式的系统中,server端是否有状态,决定了系统是否具备良好的水平扩展、负载均衡、故障恢复等特性。Server端承诺会维护client端状态数据,这个状态仅仅维持一小段时间, 这段时间以后,server端就会丢弃这个状态,恢复正常状态Eventually consistent:数据的最终一致性初步认识zookeeperzookeeper是一个开源的分布式协调服务,是由雅虎创建的,基于google chubby。zookeeper是什么分布式数据一致性的解决方案zookeeper能做什么 数据的发布/订阅(配置中心:disconf) 负载均衡(dubbo利用了zookeeper机制实现负载均衡) 命名服务 master选举(kafka、hadoop、hbase) 分布式队列 分布式锁zookeeper的特性顺序一致性 从同一个客户端发起的事务请求,最终会严格按照顺序被应用到zookeeper中原子性 所有的事务请求的处理结果在整个集群中的所有机器上的应用情况是一致的,也就是说,要么整个集群中的所有机器都成功应用了某一事务、要么全都不应用可靠性 一旦服务器成功应用了某一个事务数据,并且对客户端做了响应,那么这个数据在整个集群中一定是同步并且保留下来的实时性 一旦一个事务被成功应用,客户端就能够立即从服务器端读取到事务变更后的最新数据状态;(zookeeper仅仅保证在一定时间内,近实时)zookeeper安装单机环境安装下载zookeeper的安装包http://apache.fayea.com/zooke…解压zookeeper tar -zxvf zookeeper-3.4.10.tar.gzcd 到 ZK_HOME/conf , copy一份zoo.cfg cp zoo_sample.cfg zoo.cfgsh zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}sh zkCli.sh -server ip:port默认端口 2181127.0.0.1:2181 zkServer.sh start-foreground集群环境建议使用奇数对于复制模式,至少需要三台服务器,强烈建议您使用奇数个服务器。如果您只有两台服务器,那么您处于这样的情况:如果其中一台服务器出现故障,则没有足够的机器来构成多数仲裁。两台服务器本质上 不如 单一服务器稳定,因为有两个单点故障。zookeeper集群, 包含三种角色:leader : 所有写和更改操作接受,转发到其他节点,接受所有Follower的提案请求并统一协调发起提案的投票,负责与所有的Follower进行内部的数据交换(同步)follower : 直接为客户端服务并参与提案的投票,同时与Leader进行数据交换(同步)observer: 直接为客户端服务但不参与提案的投票,同时也与Leader进行数据交换(同步)observerobserver 是一种特殊的zookeeper节点。可以帮助解决zookeeper的扩展性(如果大量客户端访问我们zookeeper集群,需要增加zookeeper集群机器数量。从而增加zookeeper集群的性能。 导致zookeeper写性能下降, zookeeper的数据变更需要半数以上服务器投票通过。造成网络消耗增加投票成本)observer不参与投票。 只接收投票结果。不属于zookeeper的关键部位。在zoo.cfg里面增加在集群中的每一台服务器必须感知其他服务器(1,2,3表示id,取值范围 1-255)peerType=observerserver.1=host:2181:3181:observerserver.2=host:2181:3181server.3=host:2181:3181quorumListenOnAllIPs=true第一步: 修改配置文件 server.id=host:port:port id的取值范围: 1~255; 用id来标识该机器在集群中的机器序号 2181是zookeeper的端口; //3306 3181表示leader选举的端口 server.1=host:2181:3181 server.2=host:2181:3181 server.3=host:2181:3181第二步:创建myid 在每一个服务器的dataDir目录下创建一个myid的文件,文件就一行数据,数据内容是每台机器对应的server ID的数字第三步:启动zookeeper分布式系统里面的特点分布式系统架构存在的问题中心化和去中心化CAP和BASEzookeeper的安装 单机环境安装/集群环境安装zookeeper的特性1. zookeeper的客户端使用2. zoo.cfg里面配置信息的讲解3. zookeeper的一些常见概念模型4. zookeeper java客户端的使用集群的角色: leader :followerobserver:避免写性能下降,不需要参与写集群的搭建修改zoo.cfg 129/135/136 server.id=ip:port:port server.1=host:2888:3181 2888表示follower节点与leader节点交换信息的端口号 3181 如果leader节点挂掉了, 需要一个端口来重新选举。 server.2=host:2888:3181 server.3=host:2888:3181zoo.cfg中有一个dataDir = /tmp/zookeeper $dataDir/myid 添加一个myid文件。启动服务 如果需要增加observer节点zoo.cfg中 增加 ;peerType=observerserver.1=host:2888:3181 server.2=host:2888:3181 server.3=host:2888:3181:observerzoo.cfg配置文件分析tickTime=2000 zookeeper中最小的时间单位长度 (ms)initLimit=10 follower节点启动后与leader节点完成数据同步的时间syncLimit=5 leader节点和follower节点进行心跳检测的最大延时时间dataDir=/tmp/zookeeper 表示zookeeper服务器存储快照文件的目录dataLogDir 表示配置 zookeeper事务日志的存储路径,默认指定在dataDir目录下clientPort 表示客户端和服务端建立连接的端口号: 2181zookeeper中的一些概念数据模型zookeeper的数据模型和文件系统类似,每一个节点称为:znode. 是zookeeper中的最小数据单元。每一个znode上都可以保存数据和挂载子节点。 从而构成一个层次化的属性结构节点特性持久化节点 : 节点创建后会一直存在zookeeper服务器上,直到主动删除持久化有序节点 :每个节点都会为它的一级子节点维护一个顺序临时节点 : 临时节点的生命周期和客户端的会话保持一致。当客户端会话失效,该节点自动清理,临时节点下面不能挂子节点临时有序节点 : 在临时节点上多勒一个顺序性特性会话Watcherzookeeper提供了分布式数据发布/订阅,zookeeper允许客户端向服务器注册一个watcher监听。当服务器端的节点触发指定事件的时候会触发watcher。服务端会向客户端发送一个事件通知watcher的通知是一次性,一旦触发一次通知后,该watcher就失效ACLzookeeper提供控制节点访问权限的功能,用于有效的保证zookeeper中数据的安全性。避免误操作而导致系统出现重大事故。CREATE READWRITEDELETEADMINzookeeper的命令操作1. create [-s]【-e】 path data acl -s 表示节点是否有序 -e 表示是否为临时节点 默认情况下,是持久化节点 path需要/开头2. get path [watch] 获得指定 path的信息3.set path data [version] 修改节点 path对应的data 乐观锁的概念 每个节点创建好之后会有一个dataversion,默认是0,如果后续此值更改之后,dataversion会更改,我们可以通过version来保证乐观锁 数据库里面有一个 version 字段去控制数据行的版本号4.delete path [version] 删除节点stat信息 cversion = 0 子节点的版本号 aclVersion = 0 表示acl的版本号,修改节点权限 dataVersion = 1 表示的是当前节点数据的版本号 czxid 节点被创建时的事务ID mzxid 节点最后一次被更新的事务ID pzxid 当前节点下的子节点最后一次被修改时的事务ID ctime = Sat Aug 05 20:48:26 CST 2017 创建时间 mtime = Sat Aug 05 20:48:50 CST 2017 修改时间 cZxid = 0x500000015 ctime = Sat Aug 05 20:48:26 CST 2017 mZxid = 0x500000016 mtime = Sat Aug 05 20:48:50 CST 2017 pZxid = 0x500000015 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 创建临时节点的时候,会有一个sessionId 。 该值存储的就是这个sessionid,客户端有一定时间生效时间,具有重试机制,过一段时间判断是否有会话,如果没有,则删除 dataLength = 3 数据值长度 numChildren = 0 子节点数java API的使用导入jar包<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version></dependency>public class CreateNodeDemo implements Watcher { private final static String CONNECT_STRING = “host:2181,” + “host:2181,” + “host:2181”; private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper = null; private static Stat stat = new Stat(); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { zooKeeper = new ZooKeeper(CONNECT_STRING, 5000, new CreateNodeDemo()); countDownLatch.await(); System.out.println(zooKeeper.getState()); String path = zooKeeper.create( “/java1”, “123”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //watcher是一次性的,所以使用一次之后需要重新注册 zooKeeper.getData("/java1", true, stat); System.out.println(“节点创建成功:” + path); zooKeeper.setData("/java1", “231”.getBytes(), -1); zooKeeper.delete("/java1", -1); zooKeeper.create("/java1/dd", “da”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Thread.sleep(3000); List<String> children = zooKeeper.getChildren("/java1", true); for (String child : children) { System.out.println(child); } //权限模式 } public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { if (watchedEvent.getType() == watchedEvent.getType() && null == watchedEvent.getPath()) { countDownLatch.countDown(); System.out.println(watchedEvent.getState() + “–>” + watchedEvent.getType()); } if (watchedEvent.getType() == Event.EventType.NodeCreated) { try { System.out.println(“创建节点” + watchedEvent.getPath() + “改变后的值是” + zooKeeper.getData(watchedEvent.getPath(), true, stat)); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } if (watchedEvent.getType() == Event.EventType.NodeDeleted) { System.out.println(“删除节点” + watchedEvent.getPath()); } if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { try { System.out.println(“子节点改变” + watchedEvent.getPath() + “改变后的值是” + zooKeeper.getData(watchedEvent.getPath(), true, stat)); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { System.out.println("————–"); try { System.out.println(“数据改变” + watchedEvent.getPath() + “改变后的值是” + zooKeeper.getData(watchedEvent.getPath(), true, stat)); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); }}权限控制模式schema:授权对象ip : hostDigest : username:passwordworld : 开放式的权限控制模式,数据节点的访问权限对所有用户开放。 world:anyonesuper :超级用户,可以对zookeeper上的数据节点进行操作连接状态KeeperStat.Expired 在一定时间内客户端没有收到服务器的通知, 则认为当前的会话已经过期了。KeeperStat.Disconnected 断开连接的状态KeeperStat.SyncConnected 客户端和服务器端在某一个节点上建立连接,并且完成一次version、zxid同步KeeperStat.authFailed 授权失败事件类型NodeCreated 当节点被创建的时候,触发NodeChildrenChanged 表示子节点被创建、被删除、子节点数据发生变化NodeDataChanged 节点数据发生变化NodeDeleted 节点被删除None 客户端和服务器端连接状态发生变化的时候,事件类型就是Nonezkclient<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version></dependency>提供了递归创建父节点递归删除节点的功能curatorCurator本身是Netflix公司开源的zookeeper客户端;curator提供了各种应用场景的实现封装curator-framework 提供了fluent风格api,curator-replice 提供了实现封装curator连接的重试策略ExponentialBackoffRetry() 衰减重试 RetryNTimes 指定最大重试次数RetryOneTime 仅重试一次RetryUnitilElapsed 一直重试知道规定的时间zookeeper的实际应用场景zookeeper能够实现哪些场景订阅发布watcher机制统一配置管理(disconf)分布式锁redis setnxzookeeper 节点特性数据库 负载均衡ID生成器分布式队列统一命名服务master选举分布式锁master选举 ...

March 7, 2019 · 3 min · jiezi

new URI(zk_servers_1) 路径包含下划线无法获取host的问题

spring cloud gateway使用zookeeper作为注册中心调用其它服务的时候报了下面这个错误:ava.lang.NullPointerException: null at io.netty.util.NetUtil.isValidIpV4Address(NetUtil.java:648) ~[netty-common-4.1.29.Final.jar:4.1.29.Final] at io.netty.util.NetUtil.createByteArrayFromIpAddressString(NetUtil.java:368) ~[netty-common-4.1.29.Final.jar:4.1.29.Final] at reactor.ipc.netty.options.InetSocketAddressUtil.attemptParsingIpString(InetSocketAddressUtil.java:132) ~[reactor-netty-0.7.10.RELEASE.jar:0.7.10.RELEASE] at reactor.ipc.netty.options.InetSocketAddressUtil.createForIpString(InetSocketAddressUtil.java:80) ~[reactor-netty-0.7.10.RELEASE.jar:0.7.10.RELEASE] at reactor.ipc.netty.options.InetSocketAddressUtil.createInetSocketAddress(InetSocketAddressUtil.java:69) ~[reactor-netty-0.7.10.RELEASE.jar:0.7.10.RELEASE] at reactor.ipc.netty.options.ClientOptions.createInetSocketAddress(ClientOptions.java:253) ~[reactor-netty-0.7.10.RELEASE.jar:0.7.10.RELEASE] at reactor.ipc.netty.http.client.HttpClientOptions.getRemoteAddress(HttpClientOptions.java:87) ~[reactor-netty-0.7.10.RELEASE.jar:0.7.10.RELEASE] at reactor.ipc.netty.http.client.MonoHttpClientResponse.lambda$subscribe$0(MonoHttpClientResponse.java:76) ~[reactor-netty-0.7.10.RELEASE.jar:0.7.10.RELEASE]调用的地址是http://zks_servers_1:18001,zks_servers_1是服务的hosts配置的名称,空指针异常跟踪发现是下面这个问题导致的://类HttpClientOptions public final InetSocketAddress getRemoteAddress(URI uri) { Objects.requireNonNull(uri, “uri”); boolean secure = isSecure(uri); int port = uri.getPort() != -1 ? uri.getPort() : (secure ? 443 : 80); boolean shouldResolveAddress = !this.useProxy(uri.getHost()); return this.createInetSocketAddress(uri.getHost(), port, shouldResolveAddress); }uri.getHost()返回值是null,也就是说根据上面的调用地址,没有获取到对应的host。uri的创建方式是://类MonoHttpClientResponseMonoHttpClientResponse(HttpClient parent, String url, HttpMethod method, Function<? super HttpClientRequest, ? extends Publisher<Void>> handler) { this.parent = parent; boolean isWs = Objects.equals(method, HttpClient.WS); try { this.startURI = new URI(parent.options.formatSchemeAndHost(url, isWs)); } catch (URISyntaxException var7) { throw Exceptions.bubble(var7); } this.method = isWs ? HttpMethod.GET : method; this.handler = handler; }创建方式是调用URI的new URI(String)方法,知道原因之后在本地测试:try { URI uri = new URI(“http://zks_servers_1:18001/test.html”); String host = uri.getHost(); System.out.println(host); } catch (URISyntaxException e) { e.printStackTrace(); }确实获取不到host,查看源码发现: /** * Returns the host component of this URI. * * <li><p> A domain name consisting of one or more <i>labels</i> * separated by period characters ({@code ‘.’}), optionally followed by * a period character. Each label consists of <i>alphanum</i> characters * as well as hyphen characters ({@code ‘-’}), though hyphens never * occur as the first or last characters in a label. The rightmost * label of a domain name consisting of two or more labels, begins * with an <i>alpha</i> character. </li> * </ul> * * The host component of a URI cannot contain escaped octets, hence this * method does not perform any decoding. * * @return The host component of this URI, * or {@code null} if the host is undefined */ public String getHost() { return host; }谷歌翻译:由一个或多个标签组成的域名 由句点字符代码’.‘分隔,可选地后跟 一个英文句号角色。 每个标签由alphanum字符组成 以及连字符字符代码’ - ‘,虽然连字符永远不会 作为标签中的第一个或最后一个字符出现。 最右边包含最少长度最少两个并且以英文字符开始的标签举例如下:www.baidu.com,这个域名包含三个标签www、baidu、com;www.baidu-zhidao.com,这个域名包含三个标签www、baidu-zhidao、com;这个例子就是说明每个标签都可以使用-连接;然后看一下,我的报错的服务名称zks_servers_1,这个名称没有以【.】分割,包含了非法字符【_】最后以单个数字结尾也不符合要求。实际上测试发现zks-servers-1这样也是不正确的,不知道是不是翻译的有问题,这个结果和翻译不太匹配。总而言之,修改服务器的hosts配置就行了。 ...

February 28, 2019 · 2 min · jiezi

Curator: ZooKeeper的使用配方

Curator: ZooKeeper的使用配方ZooKeeper 作为分布式的存储方式, 有很多种使用场景, 把典型的使用场景提取出来, 成为"配方", 方便用户参考. Curator 作为这些典型场景的具体实现框架, 进一步简化了用户的使用成本.Curator 实现了 ZooKeeper 配方文档中列出的所有配方(两阶段提交除外)。单击下面的配方名称以获取详细文档。注意:大多数 Curator 配方将自动创建配方的路径的父节点, 默认值为 CreateMode.CONTAINER (即znode 为容器节点, 用于锁, 选举等功能, 空容器可能会被节点删除)。另请参阅有关“Curator Recipes Own Their ZNode/Paths”的技术说明7。选举领导者闩锁 - 在分布式计算中,领导者选举是指定单个流程作为分布在多个计算机(节点)中的某个任务的组织者的过程。在任务开始之前,所有网络节点都不知道哪个节点将充当任务的“领导者”或协调者。然而,在运行了领导者选举算法之后,整个网络中的每个节点都将特定的唯一节点识别为任务领导者。领导人选举 - Curator 领导人选举配方。锁共享重入锁 - 完全分布式锁,全局同步,意味着在任何快照时,没有两个客户端认为它们具有相同的锁定。共享锁 - 与共享重入锁类似,但不可重入。共享可重入读写锁 - 可跨 JVM 运行的可重入读/写互斥锁。读写锁保持一对相关的锁,一个用于只读操作,一个用于写入。只要没有写入时,读锁定可以由多个读取器进程同时保持。写锁是独占的。共享信号量 - 一种适用于跨 JVM 的计数信号量。所有进程使用相同锁定路径的在全部 JVM 中的都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约(从ZK的角度来看)。多共享锁 - 将多个锁作为单个实体进行管理的容器。调用acquire() 时,将获取所有锁。如果失败,则释放所有已获取的路径。类似地,当调用release() 时,将释放所有锁(忽略失败)。屏障屏障 - 分布式系统使用屏障来阻止一组节点的处理,直到满足条件,此时允许所有节点继续运行。双重屏障 - 双重屏障使客户端能够同步计算的开始和结束。当足够的进程加入屏障时,进程开始计算并在完成后离开屏障。计数器共享计数器 - 管理共享整数。观看相同路径的所有客户端将具有共享整数的最新值(考虑ZK的正常一致性保证)。分布式原子长整形 - 一个尝试原子增量的计数器。它首先尝试使用乐观锁定。如果失败,则采用可选的 InterProcessMutex。对于乐观和互斥两种情况,都有重试策略用于重试增量。高速缓存路径缓存 - 路径缓存用于观察 ZNode。每当添加,更新或删除子项时,路径缓存将更改其状态以包含当前子项集,子项的数据和子项的状态。Curator 框架中的路径缓存由 PathChildrenCache 类提供。对路径的更改将传递给已注册的 PathChildrenCacheListener 实例。节点缓存 - 一种试图保持本地缓存的节点数据的实用程序。此类将监听节点,响应更新/创建/删除事件,下拉数据等。您可以注册一个侦听器, 在发生更改时将收到通知。树缓存 - 一种实用程序,它尝试在本地缓存 Z K路径的所有子节点的所有数据。此类将观察 ZK 路径,响应更新/创建/删除事件,下拉数据等。您可以注册一个将在发生更改时收到通知的侦听器。节点持久节点 - 尝试一直保持在 ZooKeeper 中的节点,即使通过连接和会话中断也是如此。持久性TTL节点 - 当您需要创建 TTL 节点, 但又不希望通过定期手动设置数据来保持其活动时非常有用。集团成员 - 集团成员管理。将此实例添加到组中,并在组中保留成员的缓存。队列分布式队列 - 分布式队列 ZK 配方的实现。保证排入队列的项目(通过 ZK 的PERSISTENTSEQUENTIAL 节点)。如果单个消费者从队列中取出物品,他们将按FIFO 排序。如果顺序很重要,请使用 LeaderSelector 指定单个消费者。分布式Id队列 - DistributedQueue 的一个版本,它允许ID与队列项相关联。如果需要,可以从队列中删除项目。分布式优先级队列 - 一种分布式优先级队列 ZK 配方的实现。分布式延迟队列 - 一种分布式延迟队列的实现。简单分布式队列 - ZK分发附带的 DistributedQueue 的替代品。Curator 名称来源策展人(curator)的产生于西方博物馆、美术馆体系的建立密切相关。英文“curator”一词在英汉词典中以前通常被翻译成“博物馆馆长”、“掌管者”、或“监护人”等,和艺术关系最为密切的大概是“博物馆馆长”。事实上,在西方语境中,“curator”作为职业最早也主要是指16世纪以来随着私人博物馆的兴起而出现的在馆内负责藏品研究、保管和陈列的专职人员。在某些情况下,curator可能也就是馆长(director),亦需负责博物馆的行政管理、资金筹集和社会关系等。后来,随着艺术品在馆藏中数量增多和重要性上升,出现了专门负责馆藏艺术品研究、保管和陈列的人员。参考文献http://curator.apache.org/cur… ...

February 23, 2019 · 1 min · jiezi

【zookeeper】第2篇:一致性协议

2PC什么是2PC2PC是Two-Phase Commit的缩写,即二阶提交协议。它是将事务的提交过程分成两个阶段来处理的,分别是提交事务请求阶段和执行事务提交阶段;二阶提交协议常用于保证分布式系统的数据一致性。提交事务请求阶段1. 事务提交询问协调者向参与者发送事务内容,并询问是否可执行事务提交操作,等待参与者的应答结果。2. 事务执行参与者对询问的事务内容进行操作,并将Undo和Redo的信息记录到事务日志中。3. 各参与者给协调者反馈响应结果反馈结果包括可以执行事务或者不可以执行事务。二阶提交的第一阶段也可以称作“投票阶段”,即各参与者投票表明是否要继续执行事务提交操作。执行事务提交阶段协调者根据参与者的反馈结果决定是否可以进行事务提交操作,一般有以下两种情况:1. 正常执行事务提交发送事务提交请求协调者向参与者发送事务commit请求。执行事务提交参与者接受到commit请求后,正式执行事务,并在事务执行完成之后释放资源。反馈事务提交执行结果参与者完成事务提交后,向协调者发送ack消息。完成事务 协调者接收到所有参与者的ack消息后事务完成。2. 中断事务发送事务回滚请求协调者向所有参与者发出rollback请求。执行事务回滚参与者接受到rollback请求后,利用第一阶段记录的Undo信息执行事务回滚操,然后释放资源。反馈事务回滚结果参与者向协调者发送ack信息。事务中断完成协调者收到所有ack消息后表示完成事务中断。2PC的优缺点优点简单易于理解,容易实现。缺点同步阻塞为什么说是同步阻塞呢?因为在执行过程各参与者在等待其他参与者时,将无法执行其他操作。协调者单点协调者在整个二阶提交中起到了关键性作用,一旦协调者出现了问题,操作将无法执行下去。存在数据不一致的可能性在二阶提交的第二阶段中,即执行事务提交操作时,如果出现网络异常或导致部分参与者接受到了commit请求,部分参与者没有接收到commit请求,此时就会导致数据不一致。3PC什么是3PC3PC是Three-Phase Commit的缩写,即三阶提交,是对二阶提交的改进。canCommit阶段preCommit阶段doCommit阶段

January 26, 2019 · 1 min · jiezi

ZooKeeper Internals -- ZooKeeper内部工作方式

ZooKeeper Internals介绍原子广播保证,属性和定义领导者激活活动消息摘要比较法定人数记录开发者指南记录在正确的级别使用标准的slf4j成语介绍本文档包含有关ZooKeeper内部工作方式的信息。到目前为止,它讨论了以下主题:原子广播记录原子广播ZooKeeper的核心是一个原子消息系统,可以使所有服务器保持同步。保证,属性和定义ZooKeeper使用的消息传递系统提供的特定保证如下:可靠的交付:如果消息m由一台服务器提供,它将最终由所有服务器提供。总订单:如果一条服务器在消息b之前传递消息,则所有服务器将在b之前传送消息。如果a和b是传递的消息,则a将在b之前传递或b将在传递之前传递。因果顺序:如果在b的发送者发送消息a之后发送消息b,则必须在b之前订购。如果发送方在发送b后发送c,则必须在b之后订购c。ZooKeeper消息传递系统还需要高效,可靠,易于实施和维护。我们大量使用消息传递,因此我们需要系统能够每秒处理数千个请求。虽然我们可以要求至少k + 1个正确的服务器来发送新消息,但我们必须能够从相关故障中恢复,例如断电。当我们实施该系统时,我们几乎没有时间和很少的工程资源,因此我们需要一个工程师可以访问的协议,并且易于实现。我们发现我们的协议满足了所有这些目标。我们的协议假设我们可以在服务器之间构建点对点FIFO通道。虽然类似的服务通常假设消息传递可能丢失或重新排序消息,但我们假设FIFO通道非常实用,因为我们使用TCP进行通信。具体来说,我们依赖TCP的以下属性:有序传递:数据按照发送的顺序传送,只有在传送完m之前发送的所有邮件之后才传送消息m。(这样做的必然结果是,如果消息m丢失,m之后的所有消息都将丢失。)关闭后没有消息:一旦FIFO通道关闭,将不会收到任何消息。FLP证明,如果可能出现故障,则无法在异步分布式系统中实现共识。为确保我们在出现故障时达成共识,我们会使用超时。但是,我们依靠活力时间而不是正确性。因此,如果超时停止工作(例如时钟故障),则消息传递系统可能会挂起,但不会违反其保证。在描述ZooKeeper消息传递协议时,我们将讨论数据包,提议和消息:数据包:通过FIFO通道发送的字节序列提案:协议单位。通过与法定数量的ZooKeeper服务器交换数据包来商定提案。大多数提案都包含消息,但NEW_LEADER提案是与消息不对应的提案示例。消息:要以原子方式广播到所有ZooKeeper服务器的字节序列。在提交之前提交并同意的消息。如上所述,ZooKeeper保证了消息的总顺序,并且它还保证了提议的总顺序。ZooKeeper使用ZooKeeper事务id(zxid)公开总排序。所有提案在提议时都会加盖zxid,并准确反映总排序。提案将发送到所有ZooKeeper服务器,并在法定人数确认提案时提交。如果提案包含消息,则在提交提案时将传递消息。确认意味着服务器已将提议记录到持久存储。我们的法定人数要求任何一对仲裁必须至少有一个共同的服务器。我们通过要求所有法定人数的大小(n / 2 + 1)来确保这一点)其中n是组成ZooKeeper服务的服务器数量。zxid有两个部分:纪元和计数器。在我们的实现中,zxid是一个64位数字。我们使用高阶32位用于纪元,低阶32位用于计数器。因为它有两个部分代表zxid既是数字又是一对整数,(epoch,count)。时代数字代表了领导层的变化。每当新领导人上台时,它将拥有自己的纪元号码。我们有一个简单的算法来为一个提议分配一个唯一的zxid:领导者只需递增zxid以获得每个提案的唯一zxid。领导激活将确保只有一个领导者使用给定的纪元,因此我们的简单算法保证每个提案都具有唯一的ID。ZooKeeper消息传递包含两个阶段:领导者激活:在此阶段,领导者建立正确的系统状态,并准备开始提出建议。主动消息传递:在此阶段,领导者接受建议和协调消息传递的消息。ZooKeeper是一个整体协议。我们不关注个别提案,而是关注整个提案流。我们严格的订购使我们能够有效地完成这项工作并大大简化我们的协议。领导激活体现了这种整体概念。领导者只有在达到法定数量的粉丝时才会变得活跃(领导者也算作跟随者。你总是可以为自己投票)与领导者同步,他们拥有相同的状态。该州包括领导者认为已经提交的所有提案以及跟随领导者的提议,NEW_LEADER提案。(希望你是在想自己,领导者认为已提交的提案包括所有真正提交的提案吗?答案是肯定的。下面,我们说明原因。)领导者激活领导者激活包括领导者选举。我们目前在ZooKeeper中有两个领导者选举算法:LeaderElection和FastLeaderElection(AuthFastLeaderElection是FastLeaderElection的变体,它使用UDP并允许服务器执行简单形式的身份验证以避免IP欺骗)。只要符合以下条件,ZooKeeper消息传递并不关心选择领导者的确切方法:领导者已经看到了所有粉丝中最高的zxid。法定数量的服务器已承诺跟随领导者。在这两个要求中,只有第一个,跟随者需要保持正确操作的最高zxid。第二个要求,即法定数量的追随者,只需要很高的概率。我们将重新检查第二个要求,因此如果在领导者选举期间或之后发生失败并且法定人数丢失,我们将通过放弃领导者激活和进行另一次选举来恢复。领导者选举后,单个服务器将被指定为领导者,并开始等待粉丝连接。其余服务器将尝试连接到领导者。领导者将通过发送他们遗失的任何提案与追随者同步,或者如果追随者缺少太多提案,它将向关注者发送状态的完整快照。有一个角落案例,其中有一个跟随者有提议,U,领导人看不到。提案按顺序排列,因此U的提案的zxids高于领导者看到的zxids。追随者必须在领导人选举后到达,否则追随者将被选为领导者,因为它已经看到更高的zxid。由于提交的提案必须由法定数量的服务器看到,并且选出领导者的法定数量的服务器没有看到U,因此您的提议尚未提交,因此可以将其丢弃。当追随者连接到领导者时,领导者将告诉追随者丢弃U.一个新的领导者建立一个zxid来开始使用新的提议,通过获得它所见过的最高zxid的时代e,并设置下一个zxid用于(e + 1,0),领导者与跟随者同步后,它将提出一个NEW_LEADER提案。提交NEW_LEADER提案后,领导者将激活并开始接收和发布提案。这听起来很复杂,但这里是领导者激活过程中的基本操作规则:跟随者将在与领导者同步后确认NEW_LEADER提案。跟随者只会使用来自单个服务器的给定zxid确认NEW_LEADER提议。当法定数量的粉丝确认后,新的领导者将提交NEW_LEADER提案。当NEW_LEADER提议为COMMIT时,关注者将提交从领导者收到的任何州。在NEW_LEADER提案获得COMMITED之前,新领导者不会接受新提案。如果领导者选举错误地终止,我们就没有问题,因为由于领导者没有法定人数,因此不会提交NEW_LEADER提案。当发生这种情况时,领导者和任何剩下的追随者将超时并返回领导者选举。活动消息领导者激活完成所有繁重的工作。一旦领导者加冕,他就可以开始提出提案。只要他仍然是领导者,就不可能出现其他领导者,因为没有其他领导者能够获得法定数量的粉丝。如果新的领导者确实出现,那就意味着领导者失去了法定人数,新的领导者将清理领导激活期间留下的任何混乱。ZooKeeper消息传递的操作类似于传统的两阶段提交。所有通信通道都是FIFO,所以一切都按顺序完成。具体而言,遵守以下操作约束:领导者使用相同的订单向所有粉丝发送提案。此外,此顺序遵循收到请求的顺序。因为我们使用FIFO通道,这意味着关注者也会按顺序接收提案。关注者按照收到的顺序处理消息。这意味着将按顺序确认消息,并且由于FIFO通道,领导者将按顺序接收来自关注者的ACK。这也意味着如果消息$ m $已写入非易失性存储器,则在$ m $之前建议的所有消息都已写入非易失性存储器。一旦有法定数量的粉丝确认消息,领导者将向所有粉丝发出COMMIT。由于消息按顺序被确认,因此将由跟随者按顺序接收的领导者发送COMMIT。COMMIT按顺序处理。在提交提案时,关注者会发送提案消息。摘要你去吧 它为什么有效?具体而言,为什么新领导人认为的一系列提案总是包含任何实际提交的提案?首先,所有提议都有一个唯一的zxid,因此与其他协议不同,我们永远不必担心为同一个zxid提出两个不同的值; 粉丝(领导者也是粉丝)按顺序查看和记录提案; 提案按顺序提交; 由于粉丝一次只跟随一位领导者,所以每次只有一位活跃的领导者; 一位新领导人已经看到了上一个时代的所有承诺提案,因为它已经从法定数量的服务器中看到了最高的zxid; 新领导人看到的上一个时代的任何未提出的提议都将由该领导者在活跃之前承诺。比较这不只是Multi-Paxos吗?不,Multi-Paxos需要一些方法来确保只有一个协调员。我们不指望这种保证。相反,我们使用领导者激活来恢复领导层变革,或者让老领导人相信他们仍然活跃。这不仅仅是Paxos吗?您的活动消息传递阶段看起来就像Paxos的第2阶段?实际上,对我们来说,主动消息传递看起来就像是2阶段提交,而不需要处理中止。主动消息传递与它们具有交叉提议排序要求的意义不同。如果我们不对所有数据包保持严格的FIFO排序,它就会崩溃。此外,我们的领导者激活阶段与他们两个都不同。特别是,我们对epochs的使用允许我们跳过未提交的提议块,而不用担心给定zxid的重复提议。法定人数原子广播和领导者选举使用法定人数的概念来保证系统的一致视图。默认情况下,ZooKeeper使用多数仲裁,这意味着在其中一个协议中发生的每个投票都需要多数投票。一个例子是承认领导者提案:领导者只有在收到法定数量的服务器的确认后才能提交。如果我们从使用多数性中提取我们真正需要的属性,我们只需要保证用于通过投票验证操作的进程组(例如,确认领导者提议)在至少一个服务器中成对交叉。使用多数人保证这样的财产。但是,还有其他方法可以构建与多数群体不同的法定人数。例如,我们可以为服务器的投票分配权重,并说一些服务器的投票更重要。为了获得法定人数,我们得到足够的票数,以便所有投票的权重总和大于所有权重总和的一半。使用权重并且在广域部署(共址)中有用的不同结构是分层结构。通过这种构造,我们将服务器分成不相交的组并为进程分配权重。为了形成法定人数,我们必须从大多数G组获得足够的服务器,这样对于G中的每个组g,来自g的投票总和大于g中权重总和的一半。有趣的是,这种结构可以实现更小的法定人数。例如,如果我们有9个服务器,我们将它们分成3组,并为每个服务器分配1的权重,然后我们就可以形成大小为4的仲裁。注意,两个进程的子集各占大多数来自大多数组中的每个组的服务器必然具有非空交叉点。使用ZooKeeper,我们为用户提供配置服务器以使用多数仲裁,权重或组层次结构的功能。记录Zookeeper使用slf4j作为日志记录的抽象层。现在选择版本1.2中的log4j作为最终的日志记录实现。为了更好地嵌入支持,计划将来决定为最终用户选择最终的日志记录实现。因此,始终使用slf4j api在代码中编写日志语句,但配置log4j以了解如何在运行时进行日志记录。请注意,slf4j没有FATAL级别,FATAL级别的旧消息已移至ERROR级别。有关为ZooKeeper配置log4j的信息,请参阅ZooKeeper管理员指南的“ 日志记录”部分。开发者指南在代码中创建日志语句时,请遵循 slf4j手册。 在创建日志语句时,请阅读有关性能的常见问题解答。补丁审阅者将查找以下内容:记录在正确的级别slf4j中有多个级别的日志记录。选择正确的一个很重要。按从高到低的顺序:ERROR级别指定可能仍允许应用程序继续运行的错误事件。WARN级别表示潜在的有害情况。INFO级别指定信息性消息,以粗粒度级别突出显示应用程序的进度。DEBUG Level指定对调试应用程序最有用的细粒度信息事件。TRACE Level指定比DEBUG更细粒度的信息事件。ZooKeeper通常在生产中运行,以便将INFO级别严重性和更高(更严重)的日志消息输出到日志。使用标准的slf4j成语静态消息记录LOG.debug(“process completed successfully!”);但是,当需要创建参数化消息时,请使用格式化锚点。LOG.debug(“got {} messages in {} minutes”,new Object[]{count,time});命名记录器应以其使用的类命名。public class Foo {private static final Logger LOG = LoggerFactory.getLogger(Foo.class);….public Foo() { LOG.info(“constructing Foo”);异常处理try {// code} catch (XYZException e) {// do thisLOG.error(“Something bad happened”, e);// don’t do this (generally)// LOG.error(e);// why? because “don’t do” case hides the stack trace// continue process here as you need… recover or (re)throw}转载来源:https://github.com/apache/zoo… ...

January 23, 2019 · 1 min · jiezi

paascloud开源项目学习(2) -- centos7下安装SpringCloud+Vue环境

前言github 开源项目–paascloud-master:https://github.com/paascloud/…paascloud-master 官方环境搭建:http://blog.paascloud.net/201…基本环境rzyum install lrzszzip 和 unzipyum install -y unzip zipvimyum -y install vim* Java 环境jdk 8tar.gz包安装,参考:https://www.cnblogs.com/chy12…rpm包安装,参考:https://www.cnblogs.com/zengh…mysql 5.7下载mysql yum源 版本为5.7下载地址:https://dev.mysql.com/downloads/file/?id=470281查看yum源安装mysql版本上面下载后,yum localinstall mysql57-community-release-el7-11.noarch.rpmvim /etc/yum.repos.d/mysql-community.repo # 确定使用的版本,enable设为1yum install -y mysql-community-server启动mysqlsystemctl status mysqld.servicesystemctl start mysqld.service查看mysql密码cat /etc/my.cnf# log-error=/var/log/mysqld.log# pid-file=/var/run/mysqld/mysqld.pidcat /var/log/mysqld.log | grep password登录mysql数据库mysql -u root -p 修改密钥复杂度配置mysql> set global validate_password_policy=0;mysql> set global validate_password_length=6;修改密码mysql> alter user ‘root’@’localhost’ identified by ‘123456’;远程访问权限mysql> GRANT ALL PRIVILEGES ON . TO ‘root’@’%’ IDENTIFIED BY ‘123456’ WITH GRANT OPTION;mysql> flush privileges;mysql 备份参考博客:https://blog.csdn.net/SWPU_Li…crontab 命令:https://www.cnblogs.com/kensh…dockerdocker 在线安装非常慢,不推荐。依次执行下面命令yum remove docker docker-common docker-selinux docker-engineyum install -y yum-utils device-mapper-persistent-data lvm2yum-config-manager –add-repo https://download.docker.com/linux/centos/docker-ce.repoyum-config-manager –enable docker-ce-edgeyum-config-manager –enable docker-ce-testyum-config-manager –disable docker-ce-edgeyum makecache fastyum -y install docker-cesystemctl start dockerdocker run hello-worlddocker imagesREPOSITORY TAG IMAGE ID CREATED SIZEhello-world latest 1815c82652c0 2 months ago 1.84kBdocker 本地安装从官方安装包下载:docker-ce-17.06.0.ce-1.el7.centos.x86_64.rpm。安装yum install /usr/local/src/tool/docker-ce-17.06.0.ce-1.el7.centos.x86_64.rpm -y启动systemctl start docker查看docker版本docker -v开机启动# systemctl enable dockerCreated symlink from /etc/systemd/system/multi-user.target.wants/docker.service to /usr/lib/systemd/system/docker.service.docker 卸载查看已安装的docker安装包yum list installed|grep docker删除上面显示的安装包列表yum –y remove docker.x86_64.XXX删除docker镜像rm -rf /var/lib/dockerredis 4.0.2redis 单机tar 包安装下载,解压,编译:wget http://download.redis.io/releases/redis-4.0.2.tar.gztar xzf redis-4.0.2.tar.gzcd redis-4.0.2make二进制文件是编译完成后在 src 目录下,通过下面的命令启动 Redis 服务:src/redis-server使用内置的客户端命令 redis-cli 进行使用:# src/redis-cliredis> set foo barOKredis> get foo"bar"停止服务:# 第一种:杀死进程PID,kill -9 PIDps aux|grep redis# 第二种src/redis-cli shutdownyum 安装安装,启动yum install epel-releaseyum install redissystemctl start redis.serviceredis-server /etc/redis.confsystemctl enable redis常用配置vi /usr/local/redis-4.0.2/redis.confrequirepass paasword #配置密码# bind 127.0.0.1 #允许远程访问daemonize yes #后台启动自定义配置启动src/redis-server ../redis.confsrc/redis-cli -a paaswordredis 集群参考博客:http://blog.paascloud.net/201…docker 下安装 redisdocker run -d -p 6379:6379 redis:4.0.8 –requirepass “123456"nginx 1.14.X下载对应当前系统版本的 nginx 包wget http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm建立 nginx 的 yum 仓库rpm -ivh nginx-release-centos-7-0.el7.ngx.noarch.rpm安装 nginxyum -y install nginx启动 nginxsystemctl start nginx版本号nginx -vnginx version: nginx/1.14.1默认配置文件路径/etc/nginx/nginx.confrocketmq 4.2.X主要是搭建集群环境同步双写(2m-2s-sync)参考博客:http://blog.paascloud.net/201…异步复制(2m-2s-async)参考博客:https://blog.csdn.net/weixin_… 注意:如果 broker 启动失败,可能是 runbroker.sh、runserver.sh 里的内存大小设置默认过大。RocketMQ Web管理界面rocketmq 提供多种管理方式,命令行和界面等,apache 提供一个开源的扩展项目: https://github.com/apache/roc… 里面包含一个子项目 rocketmq-console,配置下,打个包就可以用了。或者可以百度搜索一下rocketmq-console.war。具体安装参考博客:https://www.jianshu.com/p/e5b…rabbitmq 3.7.3参考博客:http://blog.paascloud.net/201…zookeeper 3.4.X单机、集群、伪集群:https://www.cnblogs.com/sundd…paascloue 集群环境:http://blog.paascloud.net/201…命令启动 rabbitmq/etc/init.d/rabbitmq-server start # 或 service rabbitmq-service start 启用 RabbitMQWeb 管理插件用户名/密码:guest/guest启动rabbitmq-plugins enable rabbitmq_management 访问(修改为自己 ip):http://192.168.241.101:15672/启动 zookeeper根据上面参考博客1搭建的伪集群,因为配置文件在一个机器上的 zookeeper 目录下,所以启动时对应不同的配置文件。进入zookeeper的 conf目录下cd /root/software/zookeeper-3.4.9/conf启动# 添加了环境变量zkServer.sh start zoo1.cfgzkServer.sh start zoo2.cfgzkServer.sh start zoo3.cfg查看状态zkServer.sh status zoo1.cfgzkServer.sh status zoo2.cfgzkServer.sh status zoo3.cfg启动 zookeeper 图形化界面zookeeper 图形化的客户端工具–ZooInspector,具体使用参考博客:https://blog.csdn.net/qq_2685…。启动 zookeeper 集群后,运行 ZooInspector jar 包,当 paascloud 项目启动后,出现下面效果启动 rocketmq 集群根据上面 rocketmq集群 目录下的第一个参考博客来启动。2m-2s-sync。启动 NameServer A 192.168.241.101nohup sh /usr/local/rocketmq/bin/mqnamesrv &启动 NameServer A 192.168.241.102nohup sh /usr/local/rocketmq/bin/mqnamesrv &启动 BrokerServer A-master 192.168.241.101nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties&启动 BrokerServer A-slave 192.168.241.101nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties&启动 BrokerServer B-master 192.168.241.102nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties&启动 启动BrokerServer B-slave 192.168.241.102nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties&查看日志netstat -ntlpjpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log停止服务sh /usr/local/rocketmq/bin/mqshutdown namesrvsh /usr/local/rocketmq/bin/mqshutdown broker清理数据rm -rf /usr/local/rocketmq/data/masterrm -rf /usr/local/rocketmq/data/slavemkdir -p /usr/local/rocketmq/data/master/store/commitlogmkdir -p /usr/local/rocketmq/data/slave/store/commitlogmkdir -p /usr/local/rocketmq/data/master/store/consumequeuemkdir -p /usr/local/rocketmq/data/slave/store/consumequeuemkdir -p /usr/local/rocketmq/data/master/store/indexmkdir -p /usr/local/rocketmq/data/slave/store/indexrocketmq 集群控制台启动解压在tomcat目录,./tomcat/bin/startup.sh 启动即可。访问地址:http://192.168.0.110:8080/roc… ...

January 23, 2019 · 2 min · jiezi

Zookeeper 集群安装配置,超详细,速度收藏!

今天,栈长分享下 Zookeeper 的集群安装及配置。下载下载地址:http://zookeeper.apache.org/下载过程就不说了,我们下载了最新的zookeeper-3.4.11。安装1、上传安装包把下载的最新的包(如:zookeeper-3.4.11.tar.gz)上传到服务器,上传的方式也不多说了。2、解压$ tar zxvf zookeeper-3.4.11.tar.gz3、移动到/usr/local目录下$ mv zookeeper-3.4.11 /usr/local/zookeeper集群配置Zookeeper集群原则上需要2n+1个实例才能保证集群有效性,所以集群规模至少是3台。下面演示如何创建3台的Zookeeper集群,N台也是如此。1、创建数据文件存储目录$ cd /usr/local/zookeeper$ mkdir data2、添加主配置文件$ cd conf$ cp zoo_sample.cfg zoo.cfg3、修改配置$ vi zoo.cfg先把dataDir=/tmp/zookeeper注释掉,然后添加以下核心配置。dataDir=/usr/local/zookeeper/dataserver.1=192.168.10.31:2888:3888server.2=192.168.10.32:2888:3888server.3=192.168.10.33:2888:38884、创建myid文件$ cd ../data$ touch myid$ echo “1”>>myid每台机器的myid里面的值对应server.后面的数字x。5、开放3个端口$ sudo /sbin/iptables -I INPUT -p tcp –dport 2181 -j ACCEPT$ sudo /sbin/iptables -I INPUT -p tcp –dport 2888 -j ACCEPT$ sudo /sbin/iptables -I INPUT -p tcp –dport 3888 -j ACCEPT$ sudo /etc/rc.d/init.d/iptables save$ sudo /etc/init.d/iptables restart$ sudo /sbin/iptables -L -nChain INPUT (policy ACCEPT)target prot opt source destination ACCEPT tcp – 0.0.0.0/0 0.0.0.0/0 tcp dpt:3888 ACCEPT tcp – 0.0.0.0/0 0.0.0.0/0 tcp dpt:2888 ACCEPT tcp – 0.0.0.0/0 0.0.0.0/0 tcp dpt:21816、配置集群其他机器把配置好的Zookeeper目录复制到其他两台机器上,重复上面4-5步。$ scp -r /usr/local/zookeeper test@192.168.10.32:/usr/local/7、重启集群$ /usr/local/zookeeper/bin/zkServer.sh start3个Zookeeper都要启动。8、查看集群状态$ /usr/local/zookeeper/bin/zkServer.sh status ZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper/bin/../conf/zoo.cfgMode: follower客户端连接./zkCli.sh -server 192.168.10.31:2181连接本机的不用带-server。注意如果是在单机创建的多个Zookeeper伪集群,需要对应修改配置中的端口、日志文件、数据文件位置等配置信息。跟着栈长学 Zookeeper,可以在Java技术栈微信公众号回复关键字:Zookeeper,后续会陆续更新 Zookeeper 系列文章。本文原创首发于微信公众号:Java技术栈(id:javastack),关注公众号在后台回复 “java” 可获取更多,转载请原样保留本信息。 ...

January 15, 2019 · 1 min · jiezi

【zookeeper】分布式简单了解

什么是分布式分布式系统是一个硬件或者软件组件分布在不同的网络计算机上,彼此之间通过消息传递进行通信和协调的系统。分布式特点分布性分布式系统中的多台机器会在空间上随意分配,同时,机器的分布情况也会随时变动对等性分布式系统的计算机没有主从之分,即没有控制整个系统的主机,也没有被控制的从机,组成分布式系统的机器节点都是对等。副本是分布式系统最常见的概念,即分布式系统对数据和服务会提供一种冗余方式。并发性分布式系统中的多个节点并发操作共享资源。缺乏全局时钟故障总会发生分布式系统中常见问题通信异常在分布式系统各节点之间需要通过网络进行通信,网络通信都会伴随着位置风险,网络光纤、路由器或是DNS硬件故障都会导致通信异常分布式节点之间通信延迟脑裂当网络异常发生时,导致分布式系统中只有部分节点能够正常通信,我们俗称脑裂。三态分布式系统中每一次请求与响应都会出现以下三种状态:成功失败超时超时出现的情况有哪些?由于网路原因,请求消息没有被成功的发送到接收方,而是在发送过程中就出现了消息丢失的现象请求消息被接收方成功接收,并进行了处理,但是在将响应反馈给发送方的过程中出现了异常导致消息丢失节点故障分布式系统中的机器节点出现宕机或者“僵死”的现象

January 12, 2019 · 1 min · jiezi

使用 Exhibitor 监控管理 ZooKeeper

前言Exhibitor 是 Netflix 开源的一个用于 ZooKeeper 配置监控和管理的系统。现在 Netflix Exhibitor 已经成为社区开源公共维护项目 Soabase Exhibitor。Exhibitor 是 ZooKeeper 实例监控,备份,恢复,清理和可视化工具,是 ZooKeeper 的监控管理系统。使用 Exhibitor 监控管理 ZooKeeper更新历史2019年01月07日 - 初稿阅读原文 - https://wsgzao.github.io/post…扩展阅读exhibitor - https://github.com/soabase/ex...exhibitor简介ZooKeeper co-process for instance monitoring, backup/recovery, cleanup and visualization.Exhibitor is a Java supervisor system for ZooKeeper. It provides a number of features:Watches a ZK instance and makes sure it is runningPerforms periodic backupsPerform periodic cleaning of ZK log directoryA GUI explorer for viewing ZK nodesA rich REST APIhttps://github.com/soabase/ex…exhibitor特性Exhibitor 主要包括以下特性 / 功能:实例监控Exhibitor 实例监控在同一服务器上运行的 ZooKeeper 服务器。如果 ZK 没有运行,Exhibitor 会写入 zoo.cfg 文件(请参阅下面的 ZK 集群配置)并启动它。如果 ZooKeeper 由于某种原因崩溃,Exhibitor 也会重新启动它。日志清理在 ZooKeeper 3.4.x 之前的版本中,日志文件需要维护,Exhibitor 会负责定期维护。备份 / 还原ZooKeeper 集群中的备份比传统数据存储(例如 RDBMS)更复杂。一般来说,ZooKeeper 中的大部分数据是短暂的。盲目恢复整个 ZooKeeper 数据集可能会造成更大危害,因此,需要选择性的恢复以防止对数据集的子集造成意外损坏。Exhibitor 提供了这一功能。Exhibitor 会定期备份 ZooKeeper 的事务文件,备份后,就可以对这些事务文件建立索引。集群配置Exhibitor 为整个 Zookeeper 集群提供了一个独立的控制台,通过它所做的配置更改会对整个集群有效。以下是一些共享配置值:NameDescriptionZooKeeper Install DirPathto the ZooKeeper server installationZooKeeper Data DirPath where ZooKeeper should store its dataLog Index DirPath where indexed transaction logs should be keptServersList of servers/server-ids in the ensembleAdditional ConfigAdditional fields/values to store in zoo.cfg集群滚动升级Exhibitor 可以以滚动方式更新集群中的服务器,以便在进行更改时让 ZooKeeper 集群确保 Quorum 设定的最低服务能力。自动实例管理Exhibitor 可以配置为自动向集群中添加新实例,并删除陈旧的实例。这使得 ZooKeeper 集群可以实现 “无接触交钥匙管理”。可视化Exhibitor 为 ZooKeeper 提供了 ZNode 层次结构的图形树视图。ZK 数据维护启用后,维护人员可以在 ZooKeeper 的存储层次结构中创建 / 更新 / 删除节点。死锁检测当使用 Curator 的锁方案(或类似)时,Exhibitor 可以分析一组表示锁的 ZNode,并确定是否存在潜在的死锁。Curator 集成Exhibitor 和 Curator 可以集成工作,当集群中的信息变更时,Curator 实例可以同步更新。REST APIExhibitor 提供了一组用于程序集成的 REST API。重要事项使用 Exhibitor 时,不要手动编辑 Zookeeper 的 zoo.cfg 和 myid 文件,因为 Exhibitor 会覆盖它们。除了标准的 ZooKeeper 端口,防火墙必须打开 Exhibitor 使用的 HTTP 端口,因为每个 Exhibitor 实例需要与其他参与者通信传递状态。使用Exhibitor 的部署构件可以从 Maven 仓库获取。构件分为两种:GroupID/OrgArtifactID/NameDescriptioncom.netflix.exhibitorexhibitor-standalone自包含的,可执行的 Exhibitor 版本(可以是独立应用,也可以是 War)com.netflix.exhibitorexhibitor-core类库的形式,可以嵌入到应用中exhibitor编译安装官方分享了Maven和Gradle两种build方法,这里以Maven为例https://github.com/soabase/ex…# install mavenyum install -y maven# build exhibitormkdir exhibitorcd exhibitor/wget https://github.com/soabase/exhibitor/archive/exhibitor-1.7.1.zipunzip exhibitor-1.7.1.zipcd exhibitor-exhibitor-1.7.1/exhibitor-standalone/src/main/resources/buildscripts/standalone/mavenmvn clean package[INFO] Replacing original artifact with shaded artifact.[INFO] Replacing /root/exhibitor/exhibitor-exhibitor-1.7.1/exhibitor-standalone/src/main/resources/buildscripts/standalone/maven/target/exhibitor-1.6.0.jar with /root/exhibitor/exhibitor-exhibitor-1.7.1/exhibitor-standalone/src/main/resources/buildscripts/standalone/maven/target/exhibitor-1.6.0-shaded.jar[INFO] ————————————————————————[INFO] BUILD SUCCESS[INFO] ————————————————————————[INFO] Total time: 44.624s[INFO] Finished at: Tue Jan 08 11:28:59 SGT 2019[INFO] Final Memory: 15M/94M[INFO] ————————————————————————cp target/exhibitor-1.6.0.jar /tmp# Once built, Exhibitor is completely self-contained and can be run from the command line:java -jar <path>/exhibitor-xxx.jar -c file[root@localhost ~]# java -jar exhibitor-1.6.0.jar -c filev1.6.0INFO com.netflix.exhibitor.core.activity.ActivityLog Exhibitor started [main]Jan 08, 2019 11:32:38 AM java.util.prefs.FileSystemPreferences$1 runINFO: Created user preferences directory.INFO org.mortbay.log Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog [main]INFO org.mortbay.log jetty-1.6.0 [main]Jan 08, 2019 11:32:38 AM com.sun.jersey.server.impl.application.WebApplicationImpl _initiateINFO: Initiating Jersey application, version ‘Jersey: 1.18.3 12/01/2014 08:23 AM’INFO org.mortbay.log Started SocketConnector@0.0.0.0:8080 [main]Jan 08, 2019 11:33:00 AM java.util.prefs.FileSystemPreferences$6 runWARNING: Prefs file removed in background /root/.java/.userPrefs/prefs.xmlINFO com.netflix.exhibitor.core.activity.ActivityLog State: latent [ActivityQueue-0]# You can test that it’s running correctly by going to this URL in a browser: http://localhost:8080/exhibitor/v1/ui/index.htmlhttp://192.168.56.103:8080/exhibitor/v1/ui/index.html管理zookeeper集群如果需要通过 Exhibitor 管理 zookeeper 集群需要在集群的每个机器上安装 Exhibitorhttps://github.com/soabase/ex…重点提一下Ensemble中的Servers配置:服务器IP之间用逗号分隔,有两种类型:S表示标准类型,O表示Observer观察者S:1:192.168.56.101,S:2:192.168.56.102,S:3:192.168.56.103 ...

January 8, 2019 · 2 min · jiezi

ZooKeeper集群配置部署

配置zookeeper进入zookerper-3.4.7目录,执行cp conf/zoo_sample.cfg conf/zoo.cfgvim conf/zoo.cfg配置文件如下:tickTime=2000initLimit=10syncLimit=5dataDir=/home/hadoopadmin/temp/zookeeper/clientPort=2181server.1=spark01:2888:3888 server.2=spark02:2888:3888 server.3=spark03:2888:3888修改完成后拷贝配置内容,依次复制到其他Server下相关配置介绍* tickTime:CS通信心跳数;以毫秒为单位,可以使用默认配置。* initLimit:LF初始通信时限;* syncLimit:LF同步通信时限;数值不宜过高。* dataDir:数据文件目录;* dataLogDir:日志文件目录;* clientPort:客户端连接端口;* server.N:服务器名称与地址(服务编号,服务地址,LF通信端口,选举端口)ZooKeeper高级配置* gloabalOutstandingLimit:最大请求堆积属,默认1000;* preAllocSize:预分配的Transaction log空间大小;* snapCount:每进行snapCount次事务日志输出后,触发一次快照;* maxClientCnxns:最大并发客户端数;* forceSync:是否提交事务的同时同步到磁盘;* leaderServes:是否禁止leader读功能;* traceFile:是否记录所有请求的log;不建议使用创建目录mkdir /home/hadoopadmin/temp/zookeeper建立Zookeeper节点标识文件myid创建myid编号,依次在每台Server上执行,注意每台Server的myid要对应正确的编号host1下输入echo “1” > /home/hadoopadmin/temp/zookeeper/myidhost2下输入echo “2” > /home/hadoopadmin/temp/zookeeper/myidhost3下输入echo “3” > /home/hadoopadmin/temp/zookeeper/myid配置环境变量并使环境变量立即生效vim /etc/profile在末尾加入以下命令(只有root用户才可以改哦)export ZOOKEEPER_HOME=/home/saprk01/bigdata/zookeeper-3.4.10export PATH=$ZOOKEEPER_HOME/bin:$PATH完成后保存退出,并使环境变量立即生效source /etc/profile同样,在其它Server也要做同样的操作。启动并监控状态启动ZooKeeperzkServer.sh start检查ZooKeeper的状态zkServer.sh status这里需要注意下,各台Server的防火墙要关闭,要不可能会报错。关闭防火墙方法:service iptables stop执行后查询是否关闭service iptables status集群启动脚本这里我写了一个集群启动脚本,基础环境(host、ssh、jdk等)都配置正确时,可以简单修改考虑使用./start-all.sh start #启动每一台机器上的kafka./start-all.sh status #查看启动状态#!/bin/bash –login#需要改成集群的机器名称ZOOKEEPERS=“spark01 spark02 spark03”# 执行前需要在/etc/environment 配置jdk环境command=$1start $command $ZOOKEEPERSstart(){ for zookeeper in $2 do echo “$1 zookeeper on $zookeeper” #需要修改一下目录 ssh -l hadoopadmin $zookeeper “/home/hadoopadmin/zookeeper-3.4.10/bin/zkServer.sh $1” #ssh -l hadoopadmin $zookeeper “java -version” done} ...

January 2, 2019 · 1 min · jiezi

dubbo源码解析(十八)远程通信——Zookeeper

远程通讯——Zookeeper目标:介绍基于zookeeper的来实现的远程通信、介绍dubbo-remoting-zookeeper内的源码解析。前言对于zookeeper我相信肯定不陌生,在之前的文章里面也有讲到zookeeper来作为注册中心。在这里,基于zookeeper来实现远程通讯,duubo封装了zookeeper client,来和zookeeper server通讯。下面是类图:源码分析(一)ZookeeperClientpublic interface ZookeeperClient { /** * 创建client * @param path * @param ephemeral / void create(String path, boolean ephemeral); /* * 删除client * @param path / void delete(String path); /* * 获得子节点集合 * @param path * @return / List<String> getChildren(String path); /* * 向zookeeper的该节点发起订阅,获得该节点所有 * @param path * @param listener * @return / List<String> addChildListener(String path, ChildListener listener); /* * 移除该节点的子节点监听器 * @param path * @param listener / void removeChildListener(String path, ChildListener listener); /* * 新增状态监听器 * @param listener / void addStateListener(StateListener listener); /* * 移除状态监听 * @param listener / void removeStateListener(StateListener listener); /* * 判断是否连接 * @return / boolean isConnected(); /* * 关闭客户端 / void close(); /* * 获得url * @return / URL getUrl();}该接口是基于zookeeper的客户端接口,其中封装了客户端的一些方法。(二)AbstractZookeeperClient该类实现了ZookeeperClient接口,是客户端的抽象类,它实现了一些公共逻辑,把具体的doClose、createPersistent等方法抽象出来,留给子类来实现。1.属性/* * url对象 /private final URL url;/* * 状态监听器集合 /private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();/* * 客户端监听器集合 /private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();/* * 是否关闭 /private volatile boolean closed = false;2.create@Overridepublic void create(String path, boolean ephemeral) { // 如果不是临时节点 if (!ephemeral) { // 判断该客户端是否存在 if (checkExists(path)) { return; } } // 获得/的位置 int i = path.lastIndexOf(’/’); if (i > 0) { // 创建客户端 create(path.substring(0, i), false); } // 如果是临时节点 if (ephemeral) { // 创建临时节点 createEphemeral(path); } else { // 递归创建节点 createPersistent(path); }}该方法是创建客户端的方法,其中createEphemeral和createPersistent方法都被抽象出来。具体看下面的类的介绍。3.addStateListener@Overridepublic void addStateListener(StateListener listener) { // 状态监听器加入集合 stateListeners.add(listener);}该方法就是增加状态监听器。4.close@Overridepublic void close() { if (closed) { return; } closed = true; try { // 关闭 doClose(); } catch (Throwable t) { logger.warn(t.getMessage(), t); }}该方法是关闭客户端,其中doClose方法也被抽象出。/* * 关闭客户端 /protected abstract void doClose();/* * 递归创建节点 * @param path /protected abstract void createPersistent(String path);/* * 创建临时节点 * @param path /protected abstract void createEphemeral(String path);/* * 检测该节点是否存在 * @param path * @return /protected abstract boolean checkExists(String path);/* * 创建子节点监听器 * @param path * @param listener * @return /protected abstract TargetChildListener createTargetChildListener(String path, ChildListener listener);/* * 为子节点添加监听器 * @param path * @param listener * @return /protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener);/* * 移除子节点监听器 * @param path * @param listener /protected abstract void removeTargetChildListener(String path, TargetChildListener listener);上述的方法都是被抽象的,又它的两个子类来实现。(三)ZkclientZookeeperClient该类继承了AbstractZookeeperClient,是zk客户端的实现类。1.属性/* * zk客户端包装类 /private final ZkClientWrapper client;/* * 连接状态 /private volatile KeeperState state = KeeperState.SyncConnected;该类有两个属性,其中client就是核心所在,几乎所有方法都调用了client的方法。2.构造函数public ZkclientZookeeperClient(URL url) { super(url); // 新建一个zkclient包装类 client = new ZkClientWrapper(url.getBackupAddress(), 30000); // 增加状态监听 client.addListener(new IZkStateListener() { /* * 如果状态改变 * @param state * @throws Exception / @Override public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; // 如果状态变为了断开连接 if (state == KeeperState.Disconnected) { // 则修改状态 stateChanged(StateListener.DISCONNECTED); } else if (state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } } @Override public void handleNewSession() throws Exception { // 状态变为重连 stateChanged(StateListener.RECONNECTED); } }); // 启动客户端 client.start();}该方法是构造方法,同时在里面也做了创建客户端和启动客户端的操作。其他方法都是实现了父类抽象的方法,并且调用的是client方法,为举个例子:@Overridepublic void createPersistent(String path) { try { // 递归创建节点 client.createPersistent(path); } catch (ZkNodeExistsException e) { }}该方法是递归场景节点,调用的就是client.createPersistent(path)。(四)CuratorZookeeperClient该类是Curator框架提供的一套高级API,简化了ZooKeeper的操作,从而对客户端的实现。1.属性/* * 框架式客户端 /private final CuratorFramework client;2.构造方法public CuratorZookeeperClient(URL url) { super(url); try { // 工厂创建者 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization(“digest”, authority.getBytes()); } // 创建客户端 client = builder.build(); // 添加监听器 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { // 如果为状态为lost,则改变为未连接 if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { // 改变状态为连接 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { // 改变状态为未连接 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); // 启动客户端 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); }}该方法是构造方法,同样里面也包含了客户端创建和启动的逻辑。其他的方法也一样是实现了父类的抽象方法,举个列子:@Overridepublic void createPersistent(String path) { try { client.create().forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); }}(五)ZookeeperTransporter@SPI(“curator”)public interface ZookeeperTransporter { /* * 连接服务器 * @param url * @return / @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url);}该方法是zookeeper的信息交换接口。同样也是一个可扩展接口,默认实现CuratorZookeeperTransporter类。(六)ZkclientZookeeperTransporterpublic class ZkclientZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect(URL url) { // 新建ZkclientZookeeperClient实例 return new ZkclientZookeeperClient(url); }}该类实现了ZookeeperTransporter,其中就是创建了ZkclientZookeeperClient实例。(七)CuratorZookeeperTransporterpublic class CuratorZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect(URL url) { // 创建CuratorZookeeperClient实例 return new CuratorZookeeperClient(url); }}该接口实现了ZookeeperTransporter,是ZookeeperTransporter默认的实现类,同样也是创建了;对应的CuratorZookeeperClient实例。(八)ZkClientWrapper该类是zk客户端的包装类。1.属性/* * 超时事件 /private long timeout;/* * zk客户端 /private ZkClient client;/* * 客户端状态 /private volatile KeeperState state;/* * 客户端线程 /private ListenableFutureTask<ZkClient> listenableFutureTask;/* * 是否开始 /private volatile boolean started = false;2.构造方法public ZkClientWrapper(final String serverAddr, long timeout) { this.timeout = timeout; listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() { @Override public ZkClient call() throws Exception { // 创建zk客户端 return new ZkClient(serverAddr, Integer.MAX_VALUE); } });}设置了超时时间和客户端线程。3.startpublic void start() { // 如果客户端没有开启 if (!started) { // 创建连接线程 Thread connectThread = new Thread(listenableFutureTask); connectThread.setName(“DubboZkclientConnector”); connectThread.setDaemon(true); // 开启线程 connectThread.start(); try { // 获得zk客户端 client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS); } catch (Throwable t) { logger.error(“Timeout! zookeeper server can not be connected in : " + timeout + “ms!”, t); } started = true; } else { logger.warn(“Zkclient has already been started!”); }}该方法是客户端启动方法。4.addListenerpublic void addListener(final IZkStateListener listener) { // 增加监听器 listenableFutureTask.addListener(new Runnable() { @Override public void run() { try { client = listenableFutureTask.get(); // 增加监听器 client.subscribeStateChanges(listener); } catch (InterruptedException e) { logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!”); } catch (ExecutionException e) { logger.error(“Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!”, e); } } });}该方法是为客户端添加监听器。其他方法都是对于 客户端是否还连接的检测,可自行查看代码。(九)ChildListenerpublic interface ChildListener { /* * 子节点修改 * @param path * @param children / void childChanged(String path, List<String> children);}该接口是子节点的监听器,当子节点变化的时候会用到。(十)StateListenerpublic interface StateListener { int DISCONNECTED = 0; int CONNECTED = 1; int RECONNECTED = 2; /* * 状态修改 * @param connected */ void stateChanged(int connected);}该接口是状态监听器,其中定义了一个状态更改的方法以及三种状态。后记该部分相关的源码解析地址:https://github.com/CrazyHZM/i…该文章讲解了基于zookeeper的来实现的远程通信、介绍dubbo-remoting-zookeeper内的源码解析,关键需要对zookeeper有所了解。该篇之后,远程通讯的源码解析就先到这里了,其实大家会发现,如果能够对讲解api系列的文章了解透了,那么后面的文章九很简单,就好像轨道铺好,可以直接顺着轨道往后,根本没有阻碍。接下来我将开始对rpc模块进行讲解。 ...

December 29, 2018 · 4 min · jiezi

微服务架构实践:从零搭建网站扫码登录

微信扫码登录大家都是应用比较多的登录方式了,现在大的购物网站像京东、淘宝等都支持使用APP扫码登录网站了。今天就用APP扫码登录网站的实例来举例说明微服务架构的搭建过程。微服务架构应该是什么样子在这之前先看一看一个微服务架构落地以后应该是什么样子的。平常所有的微服务架构更多的是从框架来讲的像Dubbo,SpringCloud等,从整个SpringCloud的生态来讲它也只包含微服务的一部分。因为微服务的拆分不可避免的造成了系统的复杂性,团队间的合作管理和持续的交付等等,都是一项比较复杂的工程,如果没有好的团队管理规范和持续交付的流程等微服务是很难落地的。下面简单介绍一下上图中微服务架构的每一层的功能和作用:基础设施层,这一项除非自己搭建IDC,基本上现在的阿里云、腾讯云和百度云等都已经很好的支撑,特别是对于小的公司来说,更节省成本。平台服务层,对于现有的微服务能够快速动态部署那就是Docker了,再加上现有k8s等容器管理工具等,更是让微服务的部署如虎添翼,如果系统已经达到已经规模以后,可以考虑使用此种方式进行动态的扩容,一般情况下使用Docker就能解决部署问题了。支撑服务层,这一层跟微服务框架贴的非常近了,像SpringCloud已经自带了很多功能,像注册中心、配置中心、熔断限流和链路跟踪等,Dubbo也自带注册中心。业务服务层,这一层主要解决的是业务系统如何使用微服务进行解耦,各业务模块间如何进行分层交互等,形成了以基础服务模块为底层和以聚合服务为前端的“大中台小前台”的产品策略。网关服务层,这一层解决了权限控制、外部调用如何进行模块的负载均衡,可以实现在该层实现权限和流量的解耦,来满足不同的端的流量和权限不同的需求。接入层,该层主要是为了解决相同网关多实例的负载均衡的问题,防止单点故障灯。微服务开发框架,现在流行的微服务框架主要是SpringCloud和Dubbo,SpingCloud提供了更加完整的生态,Dubbo更适合内部模块间的快速高并发的调用。持续交付流水线,快速进行需求迭代,从提交代码到部署上线,能够快速的交付。工程实践与规范,这一项做不好,那整个微服务实施起来绝对是痛不欲生啊,基础模块如何定义,基础模块如何与其他模块解耦,如何进行版本的管理这个我在之前的使用Git和Maven进行版本管理和迭代的方法进行了说明。端到端的工具链,这里就是敏捷运维工具,从研发代码到最终上线到生产环境,任何一部都要有工具去实现完成,实现点一个按钮就能最终上线的系统。以上讲了实现微服务架构应该要做哪些事情,现在可以想想你的微服务架构到底落地到生成程度了,闲话少说,书归正传,今天是用APP扫码登录网站这个功能来进行举例说明应该从哪些方面进行微服务的落地实践。网站扫码登录功能这个功能是指在网站上选择使用二维码扫码登录,网站展示二维码,使用已经登录的应用APP扫码并确认登录后,网站就能登录成功,这既简单快捷,又提高了安全性。现在实现扫码登录网站的技术基本上有两种,一种就是轮询,另一种就是长连接,长连接又分为服务器端单向通信和双向通信两种,服务端单向通信只能由服务器端向客户端一直发送数据,双向通信是客户端和服务器端可以相互发送数据。像微信、京东和淘宝都是采用轮询的方式进行扫码登录的,一直使用轮询的方式在请求服务器端。今天我设计的这个扫码登录的功能,是采用的长连接能够双向通信的WebSocket的方式实现的。网站扫码实现流程1.用户在网站上登录时选择扫码登录。2.服务器端收到请求,生成一个临时的令牌,前端生成带令牌的链接地址的二维码,在浏览器上显示。3.PC端同时要与后台建立起websocket连接,等待后台发送登录成功的指令过来。4.用户用应用扫码,这个时候如果已经登陆过,后台就能获取到当前用户的token,如果没有登录到系统中,需要提前做登录。5.用户在应用APP上已经显示了是否确认登录的按钮。6.用户点击确认按钮,应用APP发起后端的api调用。7.后端接收到调用,根据临时名牌向websocket模块发送当前用户的token,pc端接收到登录成功,跳转到用户个人首页。如果用户点击了取消按钮,会根据uid向websocket模块发送取消登录的指令。技术的选型1.微服务框架的选择现在比较流行的是SpringCloud和Dubbo这两个框架,RPC的微服务框架还有Motan都不错,这里我使用SpringCloud和Dubbo这两个框架,使用SpringCloud实现网关和聚合服务模块并对外提供http服务,使用Dubbo实现内部模块间的接口调用。注册中心使用Zookeeper,Zookeeper能够同时支持SpringCloud和Dubbo进行注册。2.Websocket框架选择其实Spring现在已经具备websocket的功能了,但是我没有选择使用它,因为它只是实现了websocket的基本功能,像websocket的集群,客户端的管理等等,使用spring实现的话都得从零开始写。之前就一直使用netty-socketio做websocket的开发,它具备良好的集群、客户端管理等功能,而且它本身通知支持轮询和websocket两种方式,所以选它省事省时。3.存储的选择临时令牌存放在redis中,用来进行websocket连接时的验证,防止恶意的攻击,用户数据放在mysql中。4.源码管理工具和构建工具的选择使用Git作为代码管理工具,方便进行代码持续迭代和发布上线,使用Gitlab作为源码服务器端,可以进行代码的合并管理,使整个代码质量更容易把控。采用Maven做为构建工具,并使用nexus创建自己的Maven私服,用来进行基础服务版本的管理和发布。搭建Sonar服务器,Maven中集成Sonar插件进行代码质量的自动化检测。5.持续构建和部署工具采用Docker部署的方式,快速方便。采用Jekins做持续构建,可以根据git代码变更快速的打包上线。模块功能设计根据《微服务架构:如何用十步解耦你的系统?》中微服务解耦的设计原则:1.将Websocket作为服务独立出来只用来进行数据的通信,保证其功能的单一性,独立对外提供SocketApi接口,通过Dubbo的方式来调用其服务。2.将用户功能作为服务独立出来,进行用户注册和登录的功能,并对外提供UserApi接口,通过Dubbo的方式来调用。3.对外展示的功能包括页面和静态文件都统一到WebServer模块中,需要操作用户数据或者需要使用Websocket进行通信的都统一使用Dubbo调用。4.对于基本的权限认证和动态负载均衡都统一放到Gateway模块中,Gateway可以实现http的负载均衡和websocket的负载均衡。5.如果访问量非常大时,就考虑将Gateway分开部署,单独进行http服务和websocket服务,将两者的流量解耦。6.webserver端访问量大时,可以考虑将静态页面发布到CDN中,减少该模块的负载。开发规范解耦公共服务指定良好的开发管理规范,使用Git做好版本代码的分支管理,每个需求迭代使用单独的分支,保证每次迭代都可以独立上线,Maven私服中每次SocketApi和UserApi的升级都要保留历史版本可用,Dubbo服务做好多版本的兼容支持,这样就能将基础公共的服务进行解耦。总结微服务的引入不仅仅是带来了好处,同时也带来了系统的复杂性,不能只从框架和代码的角度来考虑微服务架构的落地,更要从整个管理的角度去考虑如何括地,否则使用微服务开发只会带来更多麻烦和痛苦。长按二维码,关注公众号煮酒科技(xtech100),输入数字11返回代码哟。

December 10, 2018 · 1 min · jiezi

聊聊curator recipes的LeaderLatch

序本文主要研究一下curator recipes的LeaderLatch实例 @Test public void testCuratorLeaderLatch() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(“localhost:2181”, new ExponentialBackoffRetry(1000, 3)); client.start(); String leaderLockPath = “/leader-lock2”; List<LeaderLatch> latchList = IntStream.rangeClosed(1,10) .parallel() .mapToObj(i -> new LeaderLatch(client,leaderLockPath,“client”+i)) .collect(Collectors.toList()); latchList.parallelStream() .forEach(latch -> { try { latch.start(); } catch (Exception e) { e.printStackTrace(); } }); TimeUnit.SECONDS.sleep(5); Iterator<LeaderLatch> iterator = latchList.iterator(); while (iterator.hasNext()){ LeaderLatch latch = iterator.next(); if(latch.hasLeadership()){ System.out.println(latch.getId() + " hasLeadership"); try { latch.close(); } catch (IOException e) { e.printStackTrace(); } iterator.remove(); } } TimeUnit.SECONDS.sleep(5); latchList.stream() .filter(latch -> latch.hasLeadership()) .forEach(latch -> System.out.println(latch.getId() + " hasLeadership")); Participant participant = latchList.get(0).getLeader(); System.out.println(participant); TimeUnit.MINUTES.sleep(15); latchList.stream() .forEach(latch -> { try { latch.close(); } catch (IOException e) { e.printStackTrace(); } }); client.close(); }zkCli查询[zk: localhost:2181(CONNECTED) 17] ls /[leader-lock1, leader-lock2, zookeeper, leader-lock][zk: localhost:2181(CONNECTED) 18] ls /leader-lock2[_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]LeaderLatch.startcurator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java /** * Add this instance to the leadership election and attempt to acquire leadership. * * @throws Exception errors / public void start() throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), “Cannot be started more than once”); startTask.set(AfterConnectionEstablished.execute(client, new Runnable() { @Override public void run() { try { internalStart(); } finally { startTask.set(null); } } })); } private synchronized void internalStart() { if ( state.get() == State.STARTED ) { client.getConnectionStateListenable().addListener(listener); try { reset(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error(“An error occurred checking resetting leadership.”, e); } } } @VisibleForTesting void reset() throws Exception { setLeadership(false); setNode(null); BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( debugResetWaitLatch != null ) { debugResetWaitLatch.await(); debugResetWaitLatch = null; } if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { setNode(event.getName()); if ( state.get() == State.CLOSED ) { setNode(null); } else { getChildren(); } } else { log.error(“getChildren() failed. rc = " + event.getResultCode()); } } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); }这里start方法表示参与选举,reset方法通过forPath创建子节点这里ZKPaths.makePath(latchPath, LOCK_NAME)返回的是/latchPath/latch-这里有个callback主要做getChildren处理CreateBuilderImpl.forPathcurator-framework-4.0.1-sources.jar!/org/apache/curator/framework/imps/CreateBuilderImpl.java @VisibleForTesting static final String PROTECTED_PREFIX = “c”; @Override public String forPath(final String givenPath, byte[] data) throws Exception { if ( compress ) { data = client.getCompressionProvider().compress(givenPath, data); } final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential())); List<ACL> aclList = acling.getAclList(adjustedPath); client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList); String returnPath = null; if ( backgrounding.inBackground() ) { pathInBackground(adjustedPath, data, givenPath); } else { String path = protectedPathInForeground(adjustedPath, data, aclList); returnPath = client.unfixForNamespace(path); } return returnPath; } @VisibleForTesting String adjustPath(String path) throws Exception { if ( doProtected ) { ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); String name = getProtectedPrefix(protectedId) + pathAndNode.getNode(); path = ZKPaths.makePath(pathAndNode.getPath(), name); } return path; } private static String getProtectedPrefix(String protectedId) { return PROTECTED_PREFIX + protectedId + “-”; }如果CuratorFramework创建的时候没有指定的namespace的话,这里client.fixForNamespace返回原值adjustPath对于需要doProtected的进行处理,添加上PROTECTED_PREFIX以及protectedId(UUID)还有-,比如原来是latch-,处理之后变为_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-之后由于创建的是EPHEMERAL_SEQUENTIAL,因而最后会添加上编号,比如/leader-lock2/_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045,而节点的值为LeaderLatch指定的idLeaderLatch.getChildrencurator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java private void getChildren() throws Exception { BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { checkLeadership(event.getChildren()); } } }; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } private void checkLeadership(List<String> children) throws Exception { final String localOurPath = ourPath.get(); List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; if ( ourIndex < 0 ) { log.error(“Can’t find our node. Resetting. Index: " + ourIndex); reset(); } else if ( ourIndex == 0 ) { setLeadership(true); } else { String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ) { try { getChildren(); } catch ( Exception ex ) { ThreadUtils.checkInterrupted(ex); log.error(“An error occurred checking the leadership.”, ex); } } } }; BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { // previous node is gone - reset reset(); } } }; // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } }这里主要是调用了checkLeadership方法,该方法对于index为0的标记为leader,对于index大于0的则添加watch,watch的路径为前一个节点,如果前一个节点被删除了,则重新触发getChildren方法这里还注册一个callback,如果前一个节点被删除,则重新触发reset操作LeaderLatch.closecurator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java /* * Remove this instance from the leadership election. If this instance is the leader, leadership * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch * instances must eventually be closed. * * @throws IOException errors / @Override public void close() throws IOException { close(closeMode); } /* * Remove this instance from the leadership election. If this instance is the leader, leadership * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch * instances must eventually be closed. * * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ public synchronized void close(CloseMode closeMode) throws IOException { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), “Already closed or has not been started”); Preconditions.checkNotNull(closeMode, “closeMode cannot be null”); cancelStartTask(); try { setNode(null); client.removeWatchers(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); throw new IOException(e); } finally { client.getConnectionStateListenable().removeListener(listener); switch ( closeMode ) { case NOTIFY_LEADER: { setLeadership(false); listeners.clear(); break; } default: { listeners.clear(); setLeadership(false); break; } } } } private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); if ( oldValue && !newValue ) { // Lost leadership, was true, now false listeners.forEach(new Function<LeaderLatchListener, Void>() { @Override public Void apply(LeaderLatchListener listener) { listener.notLeader(); return null; } }); } else if ( !oldValue && newValue ) { // Gained leadership, was false, now true listeners.forEach(new Function<LeaderLatchListener, Void>() { @Override public Void apply(LeaderLatchListener input) { input.isLeader(); return null; } }); } notifyAll(); }close方法用于将该LeaderLatch退出选举,如果该latch是leader,则需要释放leadershipclose方法首先cancel掉StartTask,设置节点值为null,然后移除了watcher以及ConnectionStateListener,最后设置leadership为false,然后触发相关listener注意如果closeMode是NOTIFY_LEADER,则先设置leadership为false,触发相关listener之后再移除listener;否则是先移除listener,再设置为falsesetLeadership根据新旧值调用listener.notLeader()或者input.isLeader()ConnectionStateListenercurator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java private final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); } }; private void handleStateChange(ConnectionState newState) { switch ( newState ) { default: { // NOP break; } case RECONNECTED: { try { if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() ) { reset(); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error(“Could not reset leader latch”, e); setLeadership(false); } break; } case SUSPENDED: { if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) { setLeadership(false); } break; } case LOST: { setLeadership(false); break; } } }LeaderLatch注册了一个自定义的ConnectionStateListener,分别在RECONNECTED、SUSPENDED、LOST的时候进行相应处理setLeadership(false)的时候,会根据新旧值通知相应的listener做处理,如果原来是leader,则回调listener.notLeader()对于RECONNECTED状态,如果当前latch不是leader,则调用reset,重新走start过程注册节点小结curator recipes的LeaderLatch给我们提供了leader选举的便利方法,并提供了LeaderLatchListener供自定义处理LeaderLatch使用了zk的EPHEMERAL_SEQUENTIAL,节点名会自动带上编号,默认LOCK_NAME为latch-,另外对于protected的,会自动添加上PROTECTED_PREFIX(c)以及protectedId(UUID),因而最后的节点名的格式为PROTECTED_PREFIX+UUID+LOCK_NAME+编号,类似_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045LeaderLatch使用了ConnectionStateListener对自身节点变化进行相应处理,取index为0的节点位leader,对于非leader的还对前一个节点添加watcher针对前一节点删除进行处理,触发checkLeadership操作,重新检查自身的index是否是在children排在第一位,如果是则更新为leader,触发相应操作,如果不是则重新watch前面一个节点。如此一环扣一环的实现显得十分精妙。docLeader LatchApache Curator Leader选举 简单示例基于Apache Curator框架的两种分布式Leader选举策略详解 ...

October 11, 2018 · 5 min · jiezi

Zookeeper 通知更新可靠吗? 解读源码找答案!

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~本文由特鲁门发表于云+社区专栏导读:遇到Keepper通知更新无法收到的问题,思考节点变更通知的可靠性,通过阅读源码解析了解到zk Watch的注册以及触发的机制,本地调试运行模拟zk更新的不可靠的场景以及得出相应的解决方案。过程很曲折,但问题的根本原因也水落石出了,本文最后陈述了更新无法收到的根本原因,希望对其他人有所帮助。—————————————–通常Zookeeper是作为配置存储、分布式锁等功能被使用,配置读取如果每一次都是去Zookeeper server读取效率是非常低的,幸好Zookeeper提供节点更新的通知机制,只需要对节点设置Watch监听,节点的任何更新都会以通知的方式发送到Client端。如上图所示:应用Client通常会连接上某个ZkServer,forPath不仅仅会读取Zk 节点zkNode的数据(通常存储读取到的数据会存储在应用内存中,例如图中Value),而且会设置一个Watch,当zkNode节点有任何更新时,ZkServer会发送notify,Client运行Watch来才走出相应的事件相应。这里假设操作为更新Client本地的数据。这样的模型使得配置异步更新到Client中,而无需Client每次都远程读取,大大提高了读的性能,(图中的re-regist重新注册是因为对节点的监听是一次性的,每一次通知完后,需要重新注册)。但这个Notify是可靠的吗?如果通知失败,那岂不是Client永远都读取的本地的未更新的值?由于现网环境定位此类问题比较困难,因此本地下载源码并模拟运行ZkServer & ZkClient来看通知的发送情况。1、git 下载源码 https://github.com/apache/zookeeper2、cd 到路径下,运行ant eclipse 加载工程的依赖。3、导入Idea中。https://stackoverflow.com/que… 查看相关问题和步骤。首先运行ZkServer。QuorumPeerMain是Server的启动类。这个可以根据bin下ZkServer.sh找到入口。注意启动参数配置参数文件,指定例如启动端口等相关参数。在此之前,需要设置相关的断点。首先我们要看client设置监听后,server是如何处理的ZkClient 是使用Nio的方式与ZkServer进行通信的,Zookeeper的线程模型中使用两个线程:SendThread专门成立的请求的发送,请求会被封装为Packet(包含节点名称、Watch描述等信息)类发送给Sever。EventThread则专门处理SendThread接收后解析出的Event。ZkClient 的主要有两个Processor,一个是SycProcessor负责Cluster之间的数据同步(包括集群leader选取)。另一个是叫FinalRuestProcessor,专门处理对接受到的请求(Packet)进行处理。 //ZookeeperServer 的processPacket方法专门对收到的请求进行处理。 public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, “header”); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn incomingBuffer = incomingBuffer.slice(); //鉴权请求处理 if (h.getType() == OpCode.auth) { LOG.info(“got auth packet " + cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if(ap != null) { try { authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch(RuntimeException e) { LOG.warn(“Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn == KeeperException.Code.OK) { if (LOG.isDebugEnabled()) { LOG.debug(“Authentication succeeded for scheme: " + scheme); } LOG.info(“auth success " + cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { LOG.warn(“No authentication provider for scheme: " + scheme + " has " + ProviderRegistry.listProviders()); } else { LOG.warn(“Authentication failed for scheme: " + scheme); } // send a response… ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // … and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; } else { if (h.getType() == OpCode.sasl) { Record rsp = processSasl(incomingBuffer,cnxn); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh,rsp, “response”); // not sure about 3rd arg..what is it? return; } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); // Always treat packet from the client as a possible // local request. setLocalSessionFlag(si); //交给finalRequestProcessor处理 submitRequest(si); } } cnxn.incrOutstandingRequests(h); }FinalRequestProcessor 对请求进行解析,Client连接成功后,发送的exist命令会落在这部分处理逻辑。zkDataBase 由zkServer从disk持久化的数据建立而来,上图可以看到这里就是添加监听Watch的地方。然后我们需要了解到,当Server收到节点更新事件后,是如何触发Watch的。首先了解两个概念,FinalRequestProcessor处理的请求分为两种,一种是事务型的,一种非事务型,exist 的event-type是一个非事物型的操作,上面代码中是对其处理逻辑,对于事物的操作,例如SetData的操作。则在下面代码中处理。 private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = request != null ? request.type : hdr.getType(); long sessionId = request != null ? request.sessionId : hdr.getClientId(); if (hdr != null) { //hdr 为事物头描述,例如SetData的操作就会被ZkDataBase接管操作, //因为是对Zk的数据存储机型修改 rc = getZKDatabase().processTxn(hdr, txn); } else { rc = new ProcessTxnResult(); } if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.addGlobalSession(sessionId, cst.getTimeOut()); } else if (request != null && request.isLocalSession()) { request.request.rewind(); int timeout = request.request.getInt(); request.request.rewind(); sessionTracker.addSession(request.sessionId, timeout); } else { LOG.warn(”***>>>>> Got " + txn.getClass() + " " + txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } return rc; }这里设置了断点,就可以拦截对节点的更新操作。这两个设置了断点,就可以了解到Watch的设置过程。接下来看如何启动Zookeeper的Client。ZookeeperMain为Client的入口,同样在bin/zkCli.sh中可以找到。注意设置参数,设置Server的连接地址。修改ZookeeperMain方法,设置对节点的Watch监听。 public ZooKeeperMain(String args[]) throws IOException, InterruptedException, KeeperException { cl.parseOptions(args); System.out.println(“Connecting to " + cl.getOption(“server”)); connectToZK(cl.getOption(“server”)); while (true) { // 模拟注册对/zookeeper节点的watch监听 zk.exists("/zookeeper”, true); System.out.println(“wait”); } }启动Client。由于我们要观察节点变更的过程,上面这个Client设置了对节点的监听,那么我们需要另外一个cleint对节点进行更改,这个我们只需要在命令上进行就可以了。此时命令行的zkClient更新了/zookeeper节点,Server此时会停在setData事件的处理代码段。 public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } //触发watch监听 dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }此时,我们重点关注的类出现了。WatchManagerpackage org.apache.zookeeper.server;import java.io.PrintWriter;import java.util.HashMap;import java.util.HashSet;import java.util.LinkedHashMap;import java.util.Map;import java.util.Map.Entry;import java.util.Set;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/ * This class manages watches. It allows watches to be associated with a string * and removes watchers and their watches in addition to managing triggers. */class WatchManager { private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class); //存储path对watch的关系 private final Map<String, Set<Watcher>> watchTable = new HashMap<String, Set<Watcher>>(); //存储watch监听了哪些path节点 private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>(); synchronized int size(){ int result = 0; for(Set<Watcher> watches : watchTable.values()) { result += watches.size(); } return result; } //添加监听 synchronized void addWatch(String path, Watcher watcher) { Set<Watcher> list = watchTable.get(path); if (list == null) { // don’t waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); Set<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); } //移除 synchronized void removeWatcher(Watcher watcher) { Set<String> paths = watch2Paths.remove(watcher); if (paths == null) { return; } for (String p : paths) { Set<Watcher> list = watchTable.get(p); if (list != null) { list.remove(watcher); if (list.size() == 0) { watchTable.remove(p); } } } } Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } //触发watch Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); Set<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, “No watchers for " + path); } return null; } for (Watcher w : watchers) { Set<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } //通知发送 w.process(e); } return watchers; }}重点关注triggerWatch的方法,可以发现watch被移除后,即往watch中存储的client信息进行通知发送。 @Override public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, “Deliver event " + event + " to 0x” + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, “notification”); }没有任何确认机制,不会由于发送失败,而回写watch。结论:到这里,可以知道watch的通知机制是不可靠的,zkServer不会保证通知的可靠抵达。虽然zkclient与zkServer端是会有心跳机制保持链接,但是如果通知过程中断开,即时重新建立连接后,watch的状态是不会恢复。现在已经知道了通知是不可靠的,会有丢失的情况,那ZkClient的使用需要进行修正。本地的存储不再是一个静态的等待watch更新的状态,而是引入缓存机制,定期的去从Zk主动拉取并注册Watch(ZkServer会进行去重,对同一个Node节点的相同时间类型的Watch不会重复)。另外一种方式是,Client端收到断开连接的通知,重新注册所有关注节点的Watch。但作者遇到的现网情况是client没有收到更新通知的同时,也没有查看到连接断开的错误信息。这块仍需进一步确认。水平有限,欢迎指正 :D在StackOverFlow上的提问有了新进展:https://stackoverflow.com/que…原来官方文档已经解释了在连接断开的时候,client对watch的一些恢复操做,ps:原来上面我提到的客户端的策略已经官方实现。。。客户端会通过心跳保活,如果发现断开了连接,会重新建立连接,并发送之前对节点设置的watch以及节点zxid,如果zxid与服务端的小则说明断开期间有更改,那么server会触发通知。这么来看,Zookeeper的通知机制至少在官方的文档说明上是可靠的,至少是有相应机制去保证。ps:除Exist watch外。但是本人遇到的问题仍未解开。。后悔当初没有保留现场,深入发掘。计划先把实现改回原来的,后续进一步验证。找到原因再更新这里。最终结论更新!通过深入阅读apache的zk论坛以及源码,有一个重要的信息。上面提到的连接断开分为recoverble以及unrecoverble两种场景,这两种的区别主要是基于Session的有效期,所有的client操作包括watch都是和Session关联的,当Session在超时过期时间内,重新成功建立连接,则watch会在连接建立后重新设置。但是当Session Timeout后仍然没有成功重新建立连接,那么Session则处于Expire的状态。下面连接讲述了这个过程How should I handle SESSION_EXPIRED?这种情况下,ZookeeperClient会重新连接,但是Session将会是全新的一个。同时之前的状态是不会保存的。 private void conLossPacket(Packet p) { if (p.replyHeader == null) { return; } switch (state) { case AUTH_FAILED: p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); break; case CLOSED: // session关闭状态,直接返回。 p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); break; default: p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); } // 如果session未过期,这里进行session的状态(watches)会重新注册。 finishPacket(p); }*1、什么是zookeeper的会话过期?*一般来说,我们使用zookeeper是集群形式,如下图,client和zookeeper集群(3个实例)建立一个会话session。在这个会话session当中,client其实是随机与其中一个zk provider建立的链接,并且互发心跳heartbeat。zk集群负责管理这个session,并且在所有的provider上维护这个session的信息,包括这个session中定义的临时数据和监视点watcher。如果再网络不佳或者zk集群中某一台provider挂掉的情况下,有可能出现connection loss的情况,例如client和zk provider1连接断开,这时候client不需要任何的操作(zookeeper api已经给我们做好了),只需要等待client与其他provider重新连接即可。这个过程可能导致两个结果:1)在session timeout之内连接成功这个时候client成功切换到连接另一个provider例如是provider2,由于zk在所有的provider上同步了session相关的数据,此时可以认为无缝迁移了。2)在session timeout之内没有重新连接这就是session expire的情况,这时候zookeeper集群会任务会话已经结束,并清除和这个session有关的所有数据,包括临时节点和注册的监视点Watcher。在session超时之后,如果client重新连接上了zookeeper集群,很不幸,zookeeper会发出session expired异常,且不会重建session,也就是不会重建临时数据和watcher。我们实现的ZookeeperProcessor是基于Apache Curator的Client封装实现的。Apache Curator 错误处理机制它对于Session Expire的处理是提供了处理的监听注册ConnectionStateListner,当遇到Session Expire时,执行使用者要做的逻辑。(例如:重新设置Watch)遗憾的是,我们没有对这个事件进行处理,因此连接是一致断开的,但是!我们应用仍然会读到老的数据!在这里,我们又犯了另外一个错误,本地缓存了zookeeper的节点数据。。其实zookeeperClient已经做了本地缓存的机制,但是我们有加了一层(注:这里也有一个原因,是因为zk节点的数据时二进制的数组,业务要使用通常要反序列化,我们这里的缓存是为了减少反序列化带来的开销!),正式由于我们本地缓存了,因此即使zk断开了,仍然读取了老的值!至此,谜团已经全部解开,看来之前的实现有许多姿势是错误的,导致后续出现了各种奇怪的BUG 。现在处理的方案,是监听Reconnect的通知,当收到这个通知后,主动让本地缓存失效(这里仍然做了缓存,是因为减少反序列化的开销,zkClient的缓存只是缓存了二进制,每次拿出来仍然需要反序列化)。代码: curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { switch (newState) { case CONNECTED: break; case RECONNECTED: LOG.error(“zookeeper connection reconnected”); System.out.println(“zookeeper connection reconnected”); //本来使用invalidateAll,但是这个会使得cache所有缓存值同时失效 //如果关注节点比较多,导致同时请求zk读值,可能服务会瞬时阻塞在这一步 //因此使用guava cache refresh方法,异步更新,更新过程中, //老值返回,知道更新完成 for (String key : classInfoMap.keySet()) { zkDataCache.refresh(key); } break; case LOST: // session 超时,断开连接,这里不要做任何操作,缓存保持使用 LOG.error(“zookeeper connection lost”); System.out.println(“zookeeper connection lost”); break; case SUSPENDED: break; default: break; } } });问答如何阅读Zookeeper事务日志?相关阅读Zookeeper总览ZooKeeper入门zookeeper原理 【每日课程推荐】机器学习实战!快速入门在线广告业务及CTR相应知识 ...

October 10, 2018 · 5 min · jiezi

Kafka源码系列之源码分析zookeeper在kafka的作用

以kafka0.8.2.2源码为例给大家进行讲解的。纯属个人爱好,希望大家对不足之处批评指正。__一,zookeeper在分布式集群的作用____1,数据发布与订阅(配置中心)__发布与订阅模型,即所谓的配置中心,顾名思义就是讲发布者将数据发布到zk节点上,共订阅者动态获取数据,实现配置的集中式管理和动态更新。例如,全局的配置信息,服务服务框架的地址列表就非常适合使用。__2,负载均衡__即软件负载均衡。最典型的是消息中间件的生产、消费者负载均衡。 ...

May 30, 2018 · 1 min · jiezi