CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛
CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛
CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛

zookeeper选举时间(一篇读懂)利用zookeeper选主,zookeeper的Leader选举源码解析,源码交易平台,

1.zookeeper选举规则

Tech导读 zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。

2.zookeeper leader选举过程

本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构01Leader选举机制 在今年的敏捷团队建设中,我通过Suite执行器实现了一键自动化单元测试。

3.zookeeper选主过程

Juint除了Suite执行器还有哪些执行器呢?由此我的Runner探索之旅开始了! Leader选举机制采用半数选举算法 每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。

4.zookeeper集群选举机制

02Leader选举集群配置 理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

5.zookeeper选举端口号

1. 重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg2. 修改zoo.cfg文件,修改值如下:【plain】zoo1.cfg文件内容:dataDir=/export/data/zookeeper-1

6.zookeeper的选举过程

clientPort=2181server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participant

7.zookeeper选举leader机制

server.4=127.0.0.1:2004:3004:observerzoo2.cfg文件内容:dataDir=/export/data/zookeeper-2clientPort=2182server.1=127.0.0.1:2001:3001

8.zookeeper选举流程

server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer

9.zookeeper的选举机制

zoo3.cfg文件内容:dataDir=/export/data/zookeeper-3clientPort=2183server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participant

10.zookeeper选举模式

server.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo4.cfg文件内容:dataDir=/export/data/zookeeper-4

clientPort=2184server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participant

server.4=127.0.0.1:2004:3004:observer3. server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识participant默认参与选举标识,可不写. observer不参与选举

4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。

5. 启动三个zookeeper实例:bin/zkServer.sh start conf/zoo1.cfgbin/zkServer.sh start conf/zoo2.cfgbin/zkServer.sh start conf/zoo3.cfg

6. 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举03Leader选举流程 理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

图1 第一轮到第二轮投票流程前提:设定票据数据格式vote(sid,zxid,epoch)sid是Server ID每台服务的唯一标识,是myid文件内容;zxid是数据事务id号;epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。

按照顺序启动sid=1,sid=2节点第一轮投票:1. sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;2. sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;

3. sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。

当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);4. sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;

5. 第一轮投票选举结束第二轮投票:1. sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;2. sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;。

3. sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;

4. sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader 这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。

3.1 Leader选举采用多层队列架构 zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。

比如对某台机器发送不成功不会影响正常服务端的发送

图2 多层队列上下关系交互流程图04解析代码入口类 理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

通过查看zkServer.sh文件内容找到服务启动类: org.apache.zookeeper.server.quorum.QuorumPeerMain05选举流程代码解析 理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

图3 选举代码实现流程图1. 加载配置文件QuorumPeerConfig.parse(path);针对 Leader选举关键配置信息如下:读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。

下面遇到myid变量为当前节点自己sid标识设置peerType当前应用是否参与选举new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers

参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.【Java】public QuorumMaj(Properties props) throws ConfigException {

for (Entry entry : props.entrySet()) {String key = entry.getKey().toString();String value = entry.getValue().toString();

//读取集群配置文件中的server.开头的应用实例配置信息if (key.startsWith(“server.”)) {int dot = key.indexOf(.);long sid = Long.parseLong(key.substring(dot + 1));

QuorumServer qs = new QuorumServer(sid, value);allMembers.put(Long.valueOf(sid), qs);if (qs.type == LearnerType.PARTICIPANT)

//应用实例绑定的角色为PARTICIPANT意为参与选举votingMembers.put(Long.valueOf(sid), qs);else {//观察者成员observingMembers.put(Long.valueOf(sid), qs);

}} else if (key.equals(“version”)) {version = Long.parseLong(value, 16);}}//过半基数half = votingMembers.size() / 2;

}2. QuorumPeerMain.runFromConfig(config) 启动服务;3. QuorumPeer.startLeaderElection() 开启选举服务;设置当前选票new Vote(sid,zxid,epoch)

【plain】synchronized public void startLeaderElection(){try {if (getPeerState() == ServerState.LOOKING) {

//首轮:当前节点默认投票对象为自己currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch(IOException e) {

RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;

}//……..}创建选举管理类:QuorumCnxnManager;初始化recvQueue接收投票队列(第二层传输队列);初始化queueSendMap按sid发送投票队列(第二层传输队列);

初始化senderWorkerMap发送投票工作线程容器,表示着与sid投票节点已连接;初始化选举监听线程类QuorumCnxnManager.Listener【Java】//QuorumPeer.createCnxnManager()。

public QuorumCnxManager(QuorumPeer self,final long mySid,Map view,QuorumAuthServer authServer,QuorumAuthLearner authLearner,

int socketTimeout,boolean listenOnAllIPs,int quorumCnxnThreadsSize,boolean quorumSaslAuthEnabled) {//接收投票队列(第二层传输队列)

this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY);//按sid发送投票队列(第二层传输队列)this.queueSendMap = new ConcurrentHashMap>();

//发送投票工作线程容器,表示着与sid投票节点已连接this.senderWorkerMap = new ConcurrentHashMap();this.lastMessageSent = new ConcurrentHashMap();

String cnxToValue = System.getProperty(“zookeeper.cnxTimeout”);if(cnxToValue != null){this.cnxTO = Integer.parseInt(cnxToValue);

}this.self = self;this.mySid = mySid;this.socketTimeout = socketTimeout;this.view = view;this.listenOnAllIPs = listenOnAllIPs;

initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,quorumSaslAuthEnabled);// Starts listener thread that waits for connection requests

//创建选举监听线程 接收选举投票请求listener = new Listener();listener.setName(“QuorumPeerListener”);}//QuorumPeer.createElectionAlgorithm

protected Election createElectionAlgorithm(int electionAlgorithm){Election le=null;//TODO: use a factory rather than a switch

switch (electionAlgorithm) {case 0:le = new LeaderElection(this);break;case 1:le = new AuthFastLeaderElection(this);

break;case 2:le = new AuthFastLeaderElection(this, true);break;case 3:qcm = createCnxnManager();// new QuorumCnxManager(… new Listener())

QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){listener.start();//启动选举监听线程FastLeaderElection fle = new FastLeaderElection(this, qcm);

fle.start();le = fle;} else {LOG.error(“Null listener when initializing cnx manager”);}break;default:

assert false;}return le;}4. 开启选举监听线程QuorumCnxnManager.Listener;创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap;

sid>self.sid才可以连接过来【Java】//上面的listener.start()执行后,选择此方法public void run() {int numRetries = 0;InetSocketAddress addr;。

Socket client = null;while((!shutdown) && (numRetries < 3)){try {ss = new ServerSocket();ss.setReuseAddress(true);

if (self.getQuorumListenOnAllIPs()) {int port = self.getElectionAddress().getPort();addr = new InetSocketAddress(port);

} else {// Resolve hostname for this server in case the// underlying ip address has changed.self.recreateSocketAddresses(self.getId());

addr = self.getElectionAddress();}LOG.info(“My election bind port: ” + addr.toString());setName(addr.toString());

ss.bind(addr);while (!shutdown) {client = ss.accept();setSockOpts(client);LOG.info(“Received connection request “

+ client.getRemoteSocketAddress());// Receive and handle the connection request// asynchronously if the quorum sasl authentication is

// enabled. This is required because sasl server// authentication process may take few seconds to finish,

// this may delay next peer connection requests.if (quorumSaslAuthEnabled) {receiveConnectionAsync(client);

} else {//接收连接信息receiveConnection(client);}numRetries = 0;}} catch (IOException e) {if (shutdown) {break;

}LOG.error(“Exception while listening”, e);numRetries++;try {ss.close();Thread.sleep(1000);} catch (IOException ie) {

LOG.error(“Error closing server socket”, ie);} catch (InterruptedException ie) {LOG.error(“Interrupted while sleeping. ” +

“Ignoring exception”, ie);}closeSocket(client);}}LOG.info(“Leaving listener”);if (!shutdown) {LOG.error(“As Im leaving the listener thread, “

+ “I wont be able to participate in leader “+ “election any longer: “+ self.getElectionAddress());} else if (ss != null) {

// Clean up for shutdown.try {ss.close();} catch (IOException ie) {// Dont log an error for shutdown.

LOG.debug(“Error closing server socket”, ie);}}}//代码执行路径:receiveConnection()->handleConnection(…)private void handleConnection(Socket sock, DataInputStream din)

throws IOException {//…省略if (sid < self.getId()) {/** This replica might still believe that the connection to sid is

* up, so we have to shut down the workers before trying to open a* new connection.*/SendWorker sw = senderWorkerMap.get(sid);

if (sw != null) {sw.finish();}/** Now we start a new connection*/LOG.debug(“Create new connection to server: {}”, sid);

closeSocket(sock);if (electionAddr != null) {connectOne(sid, electionAddr);} else {connectOne(sid);}} else { // Otherwise start worker threads to receive data.

SendWorker sw = new SendWorker(sock, sid);RecvWorker rw = new RecvWorker(sock, din, sid, sw);sw.setRecv(rw);

SendWorker vsw = senderWorkerMap.get(sid);if (vsw != null) {vsw.finish();}//存储连接信息senderWorkerMap.put(sid, sw);

queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue(SEND_CAPACITY));sw.start();rw.start();}}5. 创建FastLeaderElection快速选举服务;

初始选票发送队列sendqueue(第一层队列)初始选票接收队列recvqueue(第一层队列)创建线程WorkerSender创建线程WorkerReceiver【Java】//FastLeaderElection.starter

private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self = self;proposedLeader = -1;

proposedZxid = -1;//发送队列sendqueue(第一层队列)sendqueue = new LinkedBlockingQueue();//接收队列recvqueue(第一层队列)recvqueue = new LinkedBlockingQueue();

this.messenger = new Messenger(manager);}//new Messenger(manager)Messenger(QuorumCnxManager manager) {

//创建线程WorkerSenderthis.ws = new WorkerSender(manager);this.wsThread = new Thread(this.ws,”WorkerSender[myid=” + self.getId() + “]”);

this.wsThread.setDaemon(true);//创建线程WorkerReceiverthis.wr = new WorkerReceiver(manager);this.wrThread = new Thread(this.wr,

“WorkerReceiver[myid=” + self.getId() + “]”);this.wrThread.setDaemon(true);}6. 开启WorkerSender和WorkerReceiver线程。

WorkerSender线程自旋获取sendqueue第一层队列元素sendqueue队列元素内容为相关选票信息详见ToSend类;首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;

不相同将sendqueue队列元素转储到queueSendMap第二层传输队列中【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{。

//…public void run() {while (!stop) {try {ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;

//将投票信息发送出去process(m);} catch (InterruptedException e) {break;}}LOG.info(“WorkerSender is down”);}}//QuorumCnxManager#toSend

public void toSend(Long sid, ByteBuffer b) {/** If sending message to myself, then simply enqueue it (loopback).

*/if (this.mySid == sid) {b.position(0);addToRecvQueue(new Message(b.duplicate(), sid));/** Otherwise send to the corresponding thread to send.

*/} else {/** Start a new connection if doesnt have one already.*/ArrayBlockingQueue bq = new ArrayBlockingQueue(

SEND_CAPACITY);ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq);//转储到queueSendMap第二层传输队列中if (oldq != null) {

addToSendQueue(oldq, b);} else {addToSendQueue(bq, b);}connectOne(sid);}} WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。

【Java】//WorkerReceiverpublic void run() {Message response;while (!stop) {// Sleeps on receivetry {//自旋获取recvQueue第二层传输队列元素

response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);if(response == null) continue;// The current protocol and two previous generations all send at least 28 bytes

if (response.buffer.capacity() < 28) {LOG.error(“Got a short response: ” + response.buffer.capacity());

continue;}//…if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){//第二层传输队列元素转存到recvqueue第一层队列中

recvqueue.offer(n);//…}}//…}06选举核心逻辑 理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

1. 启动线程QuorumPeer开始Leader选举投票makeLEStrategy().lookForLeader();sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。

sendqueue队列由WorkerSender线程处理【plain】//QuorunPeer.run//…try {reconfigFlagClear();if (shuttingDownLE) {

shuttingDownLE = false;startLeaderElection();}//makeLEStrategy().lookForLeader() 发送投票setCurrentVote(makeLEStrategy().lookForLeader());

} catch (Exception e) {LOG.warn(“Unexpected exception”, e);setPeerState(ServerState.LOOKING);}//…//FastLeaderElection.lookLeader

public Vote lookForLeader() throws InterruptedException {//…//向其他应用发送投票sendNotifications();//…}private void sendNotifications() {

//获取应用节点for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();

ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,

sid,proposedEpoch, qv.toString().getBytes());if(LOG.isDebugEnabled()){LOG.debug(“Sending Notification: ” + proposedLeader + ” (n.leader), 0x” +

Long.toHexString(proposedZxid) + ” (n.zxid), 0x” + Long.toHexString(logicalclock.get()) +” (n.round), ” + sid + ” (recipient), ” + self.getId() +

” (myid), 0x” + Long.toHexString(proposedEpoch) + ” (n.peerEpoch)”);}//储存投票信息sendqueue.offer(notmsg);

}}class WorkerSender extends ZooKeeperThread {//…public void run() {while (!stop) {try {//提取已储存的投票信息

ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;process(m);} catch (InterruptedException e) {

break;}}LOG.info(“WorkerSender is down”);}//…}自旋recvqueue队列元素获取投票过来的选票信息:【Java】public Vote lookForLeader() throws InterruptedException {

//…/** Loop in which we exchange notifications until we find a leader*/while ((self.getPeerState() == ServerState.LOOKING) &&

(!stop)){/** Remove next notification from queue, times out after 2 times* the termination time*///提取投递过来的选票信息

Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);/** Sends more notifications if havent received enough.

* Otherwise processes new notification.*/if(n == null){if(manager.haveDelivered()){//已全部连接成功,并且前一轮投票都完成,需要再次发起投票

sendNotifications();} else {//如果未收到选票信息,manager.contentAll()自动连接其它socket节点manager.connectAll();}/** Exponential backoff

*/int tmpTimeOut = notTimeout*2;notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);

LOG.info(“Notification time out: ” + notTimeout);}//….}//…}【Java】//manager.connectAll()->connectOne(sid)->initiateConnection(…)->startConnection(…)

private boolean startConnection(Socket sock, Long sid)throws IOException {DataOutputStream dout = null;

DataInputStream din = null;try {// Use BufferedOutputStream to reduce the number of IP packets. This is

// important for x-DC scenarios.BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());

dout = new DataOutputStream(buf);// Sending id and challenge// represents protocol version (in other words – message type)

dout.writeLong(PROTOCOL_VERSION);dout.writeLong(self.getId());String addr = self.getElectionAddress().getHostString() + “:” + self.getElectionAddress().getPort();

byte[] addr_bytes = addr.getBytes();dout.writeInt(addr_bytes.length);dout.write(addr_bytes);dout.flush();

din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));} catch (IOException e) {LOG.warn(“Ignoring exception reading or writing challenge: “, e);

closeSocket(sock);return false;}// authenticate learnerQuorumPeer.QuorumServer qps = self.getVotingView().get(sid);

if (qps != null) {// TODO – investigate why reconfig makes qps null.authLearner.authenticate(sock, qps.hostname);

}// If lost the challenge, then drop the new connection//保证集群中所有节点之间只有一个通道连接if (sid > self.getId()) {

LOG.info(“Have smaller server identifier, so dropping the ” +”connection: (” + sid + “, ” + self.getId() + “)”);

closeSocket(sock);// Otherwise proceed with the connection} else {SendWorker sw = new SendWorker(sock, sid);

RecvWorker rw = new RecvWorker(sock, din, sid, sw);sw.setRecv(rw);SendWorker vsw = senderWorkerMap.get(sid);

if(vsw != null)vsw.finish();senderWorkerMap.put(sid, sw);queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(

SEND_CAPACITY));sw.start();rw.start();return true;}return false;} 如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap中。

对应第2步中的sid

图4 节点之间连接方式【Java】public Vote lookForLeader() throws InterruptedException {//…if (n.electionEpoch > logicalclock.get()) {

//当前选举周期小于选票周期,重置recvset选票池//大于当前周期更新当前选票信息,再次发送投票logicalclock.set(n.electionEpoch);recvset.clear();if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,

getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);

} else {updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {

if(LOG.isDebugEnabled()){LOG.debug(“Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x”

+ Long.toHexString(n.electionEpoch)+ “, logicalclock=0x” + Long.toHexString(logicalclock.get()));}break;

} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期

//接收的选票与当前选票PK成功后,替换当前选票updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}//…}在上代码中,自旋从recvqueue队列中获取到选票信息。

开始进行选举:判断当前选票和接收过来的选票周期是否一致大于当前周期更新当前选票信息,再次发送投票周期相等:当前选票信息和接收的选票信息进行PK【Java】//接收的选票与当前选票PKprotected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {

LOG.debug(“id: ” + newId + “, proposed id: ” + curId + “, zxid: 0x” +Long.toHexString(newZxid) + “, proposed zxid: 0x” + Long.toHexString(curZxid));

if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}/** We return true if one of the following three cases hold:

* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same

* as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) ||((newEpoch == curEpoch) &&

((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));}在上述代码中的totalOrderPredicate方法逻辑如下:

竞选周期大于当前周期为true竞选周期相等,竞选zxid大于当前zxid为true竞选周期相等,竞选zxid等于当前zxid,竞选sid大于当前sid为true经过上述条件判断为true将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。

【Java】public Vote lookForLeader() throws InterruptedException {//…//存储节点对应的选票信息// key:选票来源sid value:选票推举的Leader sid

recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//半数选举开始if (termPredicate(recvset,

new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) {// Verify if there is any change in the proposed leader

while((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,

proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);break;}}/*WorkerSender* This predicate is true once we dont read any new

* relevant message from the reception queue*/if (n == null) {//已选举出leader 更新当前节点是否为leaderself.setPeerState((proposedLeader == self.getId()) ?

ServerState.LEADING: learningState());Vote endVote = new Vote(proposedLeader,proposedZxid, proposedEpoch);

leaveInstance(endVote);return endVote;}}//…}/*** Termination predicate. Given a set of votes, determines if have

* sufficient to declare the end of the election round.** @param votes* Set of votes* @param vote* Identifier of the vote received last PK后的选票

*/private boolean termPredicate(HashMap votes, Vote vote) {SyncedLearnerTracker voteSet = new SyncedLearnerTracker();

voteSet.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier() != null&& self.getLastSeenQuorumVerifier().getVersion() > self

.getQuorumVerifier().getVersion()) {voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}/** First make the views consistent. Sometimes peers will have different

* zxids for a server depending on timing.*///votes 来源于recvset 存储各个节点推举出来的选票信息for (Map.Entry entry : votes.entrySet()) {

//选举出的sid和其它节点选择的sid相同存储到voteSet变量中if (vote.equals(entry.getValue())) {//保存推举出来的sidvoteSet.addAck(entry.getKey());。

}}//判断选举出来的选票数量是否过半return voteSet.hasAllQuorums();}//QuorumMaj#containsQuorumpublic boolean containsQuorum(Set ackSet) {

return (ackSet.size() > half);}在上述代码中:recvset是存储每个sid推举的选票信息第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。

setPeerState更新当前节点角色;proposedLeader选举出来的sid和自己sid相等,设置为Leader;上述条件不相等,设置为Follower或Observing;更新currentVote当前选票为Leader的选票vote(2,0,1)。

07总结 理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

通过对Leader选举源码的解析,可以了解到:1. 多个应用节点之间网络通信采用BIO方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在BIO分布式中间件开发中的技术重要性。

2. 基于BIO的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的 3. 为BIO在多线程技术上的实践带来了宝贵的经验求分享求点赞求在看

© 版权声明
THE END
喜欢就支持一下吧
点赞0赞赏 分享
相关推荐
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容