作者:京东物流 梁吉超
zookeeper是一个分布式服务框架,次要解决分布式应用中常见的多种数据问题,例如集群治理,状态同步等。为解决这些问题zookeeper须要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们理解如何利用BIO通信机制,多线程多层队列实现高性能架构。
01Leader选举机制Leader选举机制采纳半数选举算法。
每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。
02Leader选举集群配置重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg批改zoo.cfg文件,批改值如下:【plain】zoo1.cfg文件内容:dataDir=/export/data/zookeeper-1clientPort=2181server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo2.cfg文件内容:dataDir=/export/data/zookeeper-2clientPort=2182server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo3.cfg文件内容:dataDir=/export/data/zookeeper-3clientPort=2183server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo4.cfg文件内容:dataDir=/export/data/zookeeper-4clientPort=2184server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerserver.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识participant默认参加选举标识,可不写. observer不参加选举4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创立myid文件,文件内容别离写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。
启动三个zookeeper实例:bin/zkServer.sh start conf/zoo1.cfgbin/zkServer.sh start conf/zoo2.cfgbin/zkServer.sh start conf/zoo3.cfg每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就能够晓得其作为服务端身份信息sid以及集群中有多少个实例参加选举。03Leader选举流程
图1 第一轮到第二轮投票流程
前提:
设定票据数据格式vote(sid,zxid,epoch)
sid是Server ID每台服务的惟一标识,是myid文件内容;zxid是数据事务id号;epoch为选举周期,为不便了解上面解说内容暂定为1首次选举,不写入上面内容里。依照程序启动sid=1,sid=2节点
第一轮投票:
sid=1节点:初始选票为本人,将选票vote(1,0)发送给sid=2节点;sid=2节点:初始选票为本人,将选票vote(2,0)发送给sid=1节点;sid=1节点:收到sid=2节点选票vote(2,0)和以后本人的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid雷同,选举最大sid。以后投票选举后果为vote(2,0),sid=1节点的选票变为vote(2,0);sid=2节点:收到sid=1节点选票vote(1,0)和以后本人的选票vote(2,0),参照上述选举形式,选举后果为vote(2,0),sid=2节点的选票不变;第一轮投票选举完结。第二轮投票:
sid=1节点:以后本人的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;sid=2节点:以后本人的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;sid=1节点:收到sid=2节点选票vote(2,0)和本人的选票vote(2,0), 依照半数选举算法,总共3个节点参加选举,已有2个节点选举出雷同选票,推举sid=2节点为Leader,本人角色变为Follower;sid=2节点:收到sid=1节点选票vote(2,0)和本人的选票vote(2,0),依照半数选举算法推举sid=2节点为Leader,本人角色变为Leader。这时启动sid=3节点后,集群里曾经选举出leader,sid=1和sid=2节点会将本人的leader选票发回给sid=3节点,通过半数选举后果还是sid=2节点为leader。
3.1 Leader选举采纳多层队列架构zookeeper选举底层次要分为选举应用层和音讯传输队列层,第一层应用层队列对立接管和发送选票,而第二层传输层队列,是依照服务端sid分成了多个队列,是为了防止给每台服务端发送音讯相互影响。比方对某台机器发送不胜利不会影响失常服务端的发送。
图2 多层队列高低关系交互流程图
04解析代码入口类通过查看zkServer.sh文件内容找到服务启动类:
org.apache.zookeeper.server.quorum.QuorumPeerMain
05选举流程代码解析
图3 选举代码实现流程图
加载配置文件QuorumPeerConfig.parse(path);针对 Leader选举要害配置信息如下:
读取dataDir目录找到myid文件内容,设置以后利用sid标识,做为投票人身份信息。上面遇到myid变量为以后节点本人sid标识。设置peerType以后利用是否参加选举new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参加选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.【Java】public QuorumMaj(Properties props) throws ConfigException { for (Entry<Object, Object> entry : props.entrySet()) { String key = entry.getKey().toString(); String value = entry.getValue().toString(); //读取集群配置文件中的server.结尾的利用实例配置信息 if (key.startsWith("server.")) { int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); QuorumServer qs = new QuorumServer(sid, value); allMembers.put(Long.valueOf(sid), qs); if (qs.type == LearnerType.PARTICIPANT)//利用实例绑定的角色为PARTICIPANT意为参加选举 votingMembers.put(Long.valueOf(sid), qs); else { //观察者成员 observingMembers.put(Long.valueOf(sid), qs); } } else if (key.equals("version")) { version = Long.parseLong(value, 16); } } //过半基数 half = votingMembers.size() / 2; }QuorumPeerMain.runFromConfig(config) 启动服务;QuorumPeer.startLeaderElection() 开启选举服务;设置以后选票new Vote(sid,zxid,epoch)【plain】synchronized public void startLeaderElection(){try { if (getPeerState() == ServerState.LOOKING) { //首轮:以后节点默认投票对象为本人 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; }//........}创立选举治理类:QuorumCnxnManager;初始化recvQueue<Message(sid,ByteBuffer)>接管投票队列(第二层传输队列);初始化queueSendMap<sid,queue>按sid发送投票队列(第二层传输队列);初始化senderWorkerMap<sid,SendWorker>发送投票工作线程容器,示意着与sid投票节点已连贯;初始化选举监听线程类QuorumCnxnManager.Listener。【Java】//QuorumPeer.createCnxnManager()public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long,QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) { //接管投票队列(第二层传输队列) this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY); //按sid发送投票队列(第二层传输队列) this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>(); //发送投票工作线程容器,示意着与sid投票节点已连贯 this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>(); this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>(); String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); } this.self = self; this.mySid = mySid; this.socketTimeout = socketTimeout; this.view = view; this.listenOnAllIPs = listenOnAllIPs; initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled); // Starts listener thread that waits for connection requests //创立选举监听线程 接管选举投票申请 listener = new Listener(); listener.setName("QuorumPeerListener");}//QuorumPeer.createElectionAlgorithmprotected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = createCnxnManager();// new QuorumCnxManager(... new Listener()) QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start();//启动选举监听线程 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; }return le;}开启选举监听线程QuorumCnxnManager.Listener;创立ServerSockket期待大于本人sid节点连贯,连贯信息存储到senderWorkerMap<sid,SendWorker>;sid>self.sid才能够连贯过去。【Java】//下面的listener.start()执行后,抉择此办法public void run() { int numRetries = 0; InetSocketAddress addr; Socket client = null; while((!shutdown) && (numRetries < 3)){ try { ss = new ServerSocket(); ss.setReuseAddress(true); if (self.getQuorumListenOnAllIPs()) { int port = self.getElectionAddress().getPort(); addr = new InetSocketAddress(port); } else { // Resolve hostname for this server in case the // underlying ip address has changed. self.recreateSocketAddresses(self.getId()); addr = self.getElectionAddress(); } LOG.info("My election bind port: " + addr.toString()); setName(addr.toString()); ss.bind(addr); while (!shutdown) { client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); // Receive and handle the connection request // asynchronously if the quorum sasl authentication is // enabled. This is required because sasl server // authentication process may take few seconds to finish, // this may delay next peer connection requests. if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else {//接管连贯信息 receiveConnection(client); } numRetries = 0; } } catch (IOException e) { if (shutdown) { break; } LOG.error("Exception while listening", e); numRetries++; try { ss.close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. " + "Ignoring exception", ie); } closeSocket(client); } } LOG.info("Leaving listener"); if (!shutdown) { LOG.error("As I'm leaving the listener thread, " + "I won't be able to participate in leader " + "election any longer: " + self.getElectionAddress()); } else if (ss != null) { // Clean up for shutdown. try { ss.close(); } catch (IOException ie) { // Don't log an error for shutdown. LOG.debug("Error closing server socket", ie); } }}//代码执行门路:receiveConnection()->handleConnection(...)private void handleConnection(Socket sock, DataInputStream din) throws IOException {//...省略 if (sid < self.getId()) { /* * This replica might still believe that the connection to sid is * up, so we have to shut down the workers before trying to open a * new connection. */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } /* * Now we start a new connection */ LOG.debug("Create new connection to server: {}", sid); closeSocket(sock); if (electionAddr != null) { connectOne(sid, electionAddr); } else { connectOne(sid); } } else { // Otherwise start worker threads to receive data. SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if (vsw != null) { vsw.finish(); } //存储连贯信息<sid,SendWorker> senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); sw.start(); rw.start(); }}创立FastLeaderElection疾速选举服务;初始选票发送队列sendqueue(第一层队列)初始选票接管队列recvqueue(第一层队列)创立线程WorkerSender创立线程WorkerReceiver【Java】//FastLeaderElection.starterprivate void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; //发送队列sendqueue(第一层队列) sendqueue = new LinkedBlockingQueue<ToSend>(); //接管队列recvqueue(第一层队列) recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager);}//new Messenger(manager)Messenger(QuorumCnxManager manager) { //创立线程WorkerSender this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); //创立线程WorkerReceiver this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true);}开启WorkerSender和WorkerReceiver线程。WorkerSender线程自旋获取sendqueue第一层队列元素
...