這篇文章主要介紹“zk中l(wèi)earner的作用是什么”,在日常操作中,相信很多人在zk中l(wèi)earner的作用是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”zk中l(wèi)earner的作用是什么”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
創(chuàng)新互聯(lián)是一家專業(yè)提供忠縣企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、成都h5網(wǎng)站建設(shè)、小程序制作等業(yè)務(wù)。10年已為忠縣眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進(jìn)行中。
learner時(shí)observer,follower的父類,定義了公共屬性和方法
子類 Follower 和Observer
內(nèi)部類:
PacketInFlight表示在提議中還沒有commit的消息
static class PacketInFlight { TxnHeader hdr; Record rec; }
屬性:
QuorumPeer | 服務(wù)器節(jié)點(diǎn) |
LearnerZooKeeperServer | learner的服務(wù)節(jié)點(diǎn) |
BufferedOutputStream | 輸出流 |
Socket | 端口套接字 |
InetSocketAddress | 地址信息 |
InputArchive | 輸入存檔 |
OutputArchive | 輸出存檔 |
leaderProtocolVersion | leader協(xié)議版本 |
BUFFERED_MESSAGE_SIZE | 緩存信息大小 |
MessageTracker | 順序接收和發(fā)送信息 |
方法
validateSession(ServerCnxn cnxn, long clientId, int timeout) | 驗(yàn)證session有效性 |
writePacket(QuorumPacket pp, boolean flush) | 發(fā)送包給leader |
readPacket(QuorumPacket pp) | 從leader讀取message |
request(Request request) | 發(fā)送request給leader |
findLeader | 查找認(rèn)為是leader的地址信息 |
createSocket() | 創(chuàng)建socket對(duì)象 |
registerWithLeader(int pktType) | 執(zhí)行handshake protocal建立follower/observer連接 |
到服務(wù)器驗(yàn)證session有效性
void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { LOG.info("Revalidating client: 0x" + Long.toHexString(clientId)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeLong(clientId); dos.writeInt(timeout); dos.close(); QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null); pendingRevalidations.put(clientId, cnxn); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "To validate session 0x" + Long.toHexString(clientId)); } writePacket(qp, true); } void writePacket(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { messageTracker.trackSent(pp.getType()); leaderOs.writeRecord(pp, "packet"); } if (flush) { bufferedOutput.flush(); } } } void request(Request request) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); if (request.request != null) { request.request.rewind(); int len = request.request.remaining(); byte[] b = new byte[len]; request.request.get(b); request.request.rewind(); oa.write(b); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); writePacket(qp, true); } 查找當(dāng)前的leader信息 protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader's correct IP address before // attempting to connect. s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn't find the leader with id = " + current.getId()); } return leaderServer; } 連接套接字 sockConnect(Socket sock, InetSocketAddress addr, int timeout) 建立和leader的連接 /** * Establish a connection with the LearnerMaster found by findLearnerMaster. * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. * Retries until either initLimit time has elapsed or 5 tries have happened. * @param addr - the address of the Peer to connect to. * @throws IOException - if the socket connection fails on the 5th attempt * if there is an authentication failure while connecting to leader * @throws X509Exception * @throws InterruptedException */ protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception { this.sock = createSocket(); this.leaderAddr = addr; // leader connection timeout defaults to tickTime * initLimit int connectTimeout = self.tickTime * self.initLimit; // but if connectToLearnerMasterLimit is specified, use that value to calculate // timeout instead of using the initLimit value if (self.connectToLearnerMasterLimit > 0) { connectTimeout = self.tickTime * self.connectToLearnerMasterLimit; } int remainingTimeout; long startNanoTime = nanoTime(); for (int tries = 0; tries < 5; tries++) { try { // recalculate the init limit time because retries sleep for 1000 milliseconds remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000); if (remainingTimeout <= 0) { LOG.error("connectToLeader exceeded on retries."); throw new IOException("connectToLeader exceeded on retries."); } sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout)); if (self.isSslQuorum()) { //開始握手 ((SSLSocket) sock).startHandshake(); } sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { //出現(xiàn)異常 remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000); //剩余超時(shí)時(shí)間 if (remainingTimeout <= 1000) { //打印錯(cuò)誤日志 LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); throw e; //嘗試次數(shù)大于4 } else if (tries >= 4) { //打印錯(cuò)誤日志 LOG.error("Unexpected exception, retries exceeded. tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); throw e; } else { //發(fā)出警告 LOG.warn("Unexpected exception, tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); //重新嘗試建立socket連接 this.sock = createSocket(); } } //讀取配置延時(shí)時(shí)間,默認(rèn)100ns Thread.sleep(leaderConnectDelayDuringRetryMs); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
到此,關(guān)于“zk中l(wèi)earner的作用是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
當(dāng)前標(biāo)題:zk中l(wèi)earner的作用是什么
路徑分享:http://sd-ha.com/article4/gcjcoe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App設(shè)計(jì)、網(wǎng)站內(nèi)鏈、網(wǎng)站制作、面包屑導(dǎo)航、自適應(yīng)網(wǎng)站、品牌網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)