主頁 > 後端開發 > Zookeeper選舉Leader原始碼剖析

Zookeeper選舉Leader原始碼剖析

2022-10-21 07:54:31 後端開發

 

開始分析

  【1】分析入口類做了什么

//org.apache.zookeeper.server.quorum包下QuorumPeerMain類
public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {..} catch (ConfigException e) {..} catch (DatadirException e) {..} catch (AdminServerException e) {..} catch (Exception e) {..}

    ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        //決議組態檔加載到記憶體
        //主要是呼叫了QuorumPeerConfig類#parse方法,決議邏輯在parseProperties方法
        config.parse(args[0]);
    }

    //啟動延時的定期清理快照資料檔案
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(),
        config.getDataLogDir(),
        config.getSnapRetainCount(),
        config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        //集群的入口
        runFromConfig(config);
    } else {
        //單機的入口
        ZooKeeperServerMain.main(args);
    }
}

  【2】runFromConfig方法做了什么

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    try {
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    LOG.info("Starting quorum peer, myid=" + config.getServerId());
    final MetricsProvider metricsProvider;
    try {
        metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
            config.getMetricsProviderClassName(),
            config.getMetricsProviderConfiguration());
    } catch (MetricsProviderLifeCycleException error) {
        throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
    }
    try {
        ServerMetrics.metricsProviderInitialized(metricsProvider);
        ProviderRegistry.initialize();
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        if (config.getClientPortAddress() != null) {
            //初始化服務端連接物件
            cnxnFactory = ServerCnxnFactory.createFactory();
            //設定監聽埠,從組態檔中拿
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
        }

        //構建本機節點,并將配置引數的資料傳入
        quorumPeer = getQuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
        //quorumPeer.setQuorumPeers(config.getAllMembers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
        quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier() != null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        //將連接物件也存入本節點
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
        quorumPeer.setSslQuorum(config.isSslQuorum());
        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        if (config.sslQuorumReloadCertFiles) {
            quorumPeer.getX509Util().enableCertFileReloading();
        }
        quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
        quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
        quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

        // sets quorum sasl authentication configurations
        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
        if (quorumPeer.isQuorumSaslAuthEnabled()) {
            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
        }
        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
        quorumPeer.initialize();

        if (config.jvmPauseMonitorToRun) {
            quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
        }
        //啟動節點
        quorumPeer.start();
        ZKAuditProvider.addZKStartStopAuditLog();
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    } finally {
        try {
            metricsProvider.stop();
        } catch (Throwable error) {
            LOG.warn("Error while stopping metrics", error);
        }
    }
}

 

  【3】通信物件的選擇

//ServerCnxnFactory類#createFactory方法
//初始化通信物件
public static ServerCnxnFactory createFactory() throws IOException {
    //屬性值展示:String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory"
    //官方推薦netty:則應該是ServerCnxnFactory類的子類NettyServerCnxnFactory
    String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
        //但是默認是子類NIOServerCnxnFactory
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        //利用反射進行初始化
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);
        throw ioe;
    }
}

 

  【4】記憶體資料庫的設計

//org.apache.zookeeper.server包下DataTree類
//節點資料是final NodeHashMap nodes;
public class DataTree {

    private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

    private final RateLogger RATE_LOGGER = new RateLogger(LOG, 15 * 60 * 1000);

    //該映射提供了對datanode的快速查找
    private final NodeHashMap nodes;

    private IWatchManager dataWatches;

    private IWatchManager childWatches;

    //快取所有datanode的路徑和資料的總大小
    private final AtomicLong nodeDataSize = new AtomicLong(0);

    //根結點
    private static final String rootZookeeper = "/";

    private static final String procZookeeper = Quotas.procZookeeper;

    private static final String procChildZookeeper = procZookeeper.substring(1);

    private static final String quotaZookeeper = Quotas.quotaZookeeper;

    private static final String quotaChildZookeeper = quotaZookeeper.substring(procZookeeper.length() + 1);

    private static final String configZookeeper = ZooDefs.CONFIG_NODE;

    private static final String configChildZookeeper = configZookeeper.substring(procZookeeper.length() + 1);

    private final PathTrie pTrie = new PathTrie();

    public static final int STAT_OVERHEAD_BYTES = (6 * 8) + (5 * 4);

    private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();

    private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

    private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

    private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();

    public static final int DIGEST_LOG_LIMIT = 1024;

    public static final int DIGEST_LOG_INTERVAL = 128;

    private ZxidDigest digestFromLoadedSnapshot;

    private volatile ZxidDigest lastProcessedZxidDigest;

    private boolean firstMismatchTxn = true;

    private final List<DigestWatcher> digestWatchers = new ArrayList<>();

    private LinkedList<ZxidDigest> digestLog = new LinkedList<>();

    private final DigestCalculator digestCalculator;

}

public class DataNode implements Record {

    private volatile long digest;

    // 指示該節點的摘要是否是最新的
    volatile boolean digestCached;

    byte[] data;

    Long acl;

    public StatPersisted stat;

    private Set<String> children = null;

    private static final Set<String> EMPTY_SET = Collections.emptySet();
}

 

  【5】quorumPeer.start()節點啟動方法又做了什么

@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    //加載快照檔案資料到記憶體
    loadDataBase();
    //啟動通信物件
    startServerCnxnFactory();
    try {
        //JettyAdminServer,啟動內嵌Jetty服務,默認8080埠
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
    }
    //初始化選舉資料
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}

private void startServerCnxnFactory() {
    if (cnxnFactory != null) {
        //如果有配置netty通信,則NettyServerCnxnFactory類#start方法
        cnxnFactory.start();
    }
    if (secureCnxnFactory != null) {
        secureCnxnFactory.start();
    }
}

//NettyServerCnxnFactory類#start方法
@Override
public void start() {
    if (listenBacklog != -1) {
        bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
    }
    LOG.info("binding to port {}", localAddress);
    parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
    // Port changes after bind() if the original port was 0, update
    // localAddress to get the real port.
    localAddress = (InetSocketAddress) parentChannel.localAddress();
    LOG.info("bound to port {}", getLocalPort());
}

//選舉資料構建
public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            //構建選票,myid服務器id標記,最大的事務id,當前服務器的選舉輪次
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    //確定選舉演算法,默認傳的是3
    this.electionAlg = createElectionAlgorithm(electionType);
}

//節點狀態
public enum ServerState {
    LOOKING,    //等待狀態
    FOLLOWING,  //從節點
    LEADING,    //主節點
    OBSERVING    //觀察狀態
}

 

  【6】選舉演算法分析(內涵多層佇列架構

//選舉演算法分析(3.8版本已經將過時的演算法去除了)
protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            //啟動監聽執行緒
            listener.start();
            //構建收發訊息執行緒
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

 

  【7】翻閱監聽執行緒 listener 做了什么

//翻閱監聽執行緒做了什么,主要是看run方法
@Override
public void run() {
    if (!shutdown) {
        LOG.debug("Listener thread started, myId: {}", self.getId());
        Set<InetSocketAddress> addresses;

        if (self.getQuorumListenOnAllIPs()) {
            addresses = self.getElectionAddress().getWildcardAddresses();
        } else {
            addresses = self.getElectionAddress().getAllAddresses();
        }

        CountDownLatch latch = new CountDownLatch(addresses.size());
        //回圈的方式針對每個地址構建一個ListenerHandler
        listenerHandlers = addresses.stream().map(address ->
                        new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
                .collect(Collectors.toList());

        //針對每個ListenerHandler都會有一個對應的執行緒進行處理(執行緒池)
        final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
        try {
            listenerHandlers.forEach(executor::submit);
        } finally {
            // prevent executor's threads to leak after ListenerHandler tasks complete
            executor.shutdown();
        }

        try {
            latch.await();
        } catch (InterruptedException ie) {..} finally {
            // Clean up for shutdown.
            for (ListenerHandler handler : listenerHandlers) {
                try {
                    handler.close();
                } catch (IOException ie) {...}
            }
        }
    }

    LOG.info("Leaving listener");
    if (!shutdown) {
        if (socketException.get()) {
            // After leaving listener thread, the host cannot join the quorum anymore,
            // this is a severe error that we cannot recover from, so we need to exit
            socketBindErrorHandler.run();
        }
    }
}

class ListenerHandler implements Runnable, Closeable {
    private ServerSocket serverSocket;
    private InetSocketAddress address;
    private boolean portUnification;
    private boolean sslQuorum;
    private CountDownLatch latch;

    ListenerHandler(InetSocketAddress address, boolean portUnification, boolean sslQuorum,CountDownLatch latch) {
        this.address = address;
        this.portUnification = portUnification;
        this.sslQuorum = sslQuorum;
        this.latch = latch;
    }

    /**
     * Sleeps on acceptConnections().
     */
    @Override
    public void run() {
        try {
            Thread.currentThread().setName("ListenerHandler-" + address);
            //建立連接
            acceptConnections();
            try {
                close();
            } catch (IOException e) {...}
        } catch (Exception e) {...} finally {
            latch.countDown();
        }
    }

    @Override
    public synchronized void close() throws IOException {
        if (serverSocket != null && !serverSocket.isClosed()) {
            LOG.debug("Trying to close listeners: {}", serverSocket);
            serverSocket.close();
        }
    }

    /**
     * Sleeps on accept().
     */
    private void acceptConnections() {
        int numRetries = 0;
        Socket client = null;

        while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
            try {
                //創建serverSocket
                serverSocket = createNewServerSocket();
                LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());
                while (!shutdown) {
                    try {
                        client = serverSocket.accept();
                        setSockOpts(client);
              //處理連接訊息
                        if (quorumSaslAuthEnabled) {
                            receiveConnectionAsync(client);
                        } else {
                            receiveConnection(client);
                        }
                        numRetries = 0;
                    } catch (SocketTimeoutException e) {...}
                }
            } catch (IOException e) {
                if (shutdown) {
                    break;
                }

                if (e instanceof SocketException) {
                    socketException.set(true);
                }

                numRetries++;
                try {
                    close();
                    Thread.sleep(1000);
                } catch (IOException ie) {...} catch (InterruptedException ie) {...}
                closeSocket(client);
            }
        }
        if (!shutdown) {...}
    }

    private ServerSocket createNewServerSocket() throws IOException {
        ServerSocket socket;

        if (portUnification) {
            LOG.info("Creating TLS-enabled quorum server socket");
            socket = new UnifiedServerSocket(self.getX509Util(), true);
        } else if (sslQuorum) {
            LOG.info("Creating TLS-only quorum server socket");
            socket = new UnifiedServerSocket(self.getX509Util(), false);
        } else {
            socket = new ServerSocket();
        }

        socket.setReuseAddress(true);
        address = new InetSocketAddress(address.getHostString(), address.getPort());
        //系結地址與埠
        socket.bind(address);

        return socket;
    }
}

 

  【7.1】receiveConnection方法怎么處理接收到的訊息

public void receiveConnection(final Socket sock) {
    DataInputStream din = null;
    try {
        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));

        handleConnection(sock, din);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

private void handleConnection(Socket sock, DataInputStream din) throws IOException {
    Long sid = null, protocolVersion = null;
    MultipleAddresses electionAddr = null;

    try {
        // 從輸入流中讀入一個Long(實際上是服務的ID)
        protocolVersion = din.readLong();
        if (protocolVersion >= 0) { // this is a server id and not a protocol version
            sid = protocolVersion;
        } else {
            try {
                InitialMessage init = InitialMessage.parse(protocolVersion, din);
                sid = init.sid;
                if (!init.electionAddr.isEmpty()) {
                    electionAddr = new MultipleAddresses(init.electionAddr, Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
                }
            } catch (InitialMessage.InitialMessageException ex) {
                closeSocket(sock);
                return;
            }
        }

        if (sid == QuorumPeer.OBSERVER_ID) {
            sid = observerCounter.getAndDecrement();
        }
    } catch (IOException e) {
        closeSocket(sock);
        return;
    }

    // do authenticating learner
    authServer.authenticate(sock, din);
    //關閉不必要的連接
    //因為socket是雙工的,而之前我們是針對了每個服務都要與之建立連接(則有,我連它【自身發起的連接】,它連了我【對方發起的連接】)
    //說白了兩條通道有一條不是必要的
    if (sid < self.getId()) { //對方的id小于自身id
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }
        // 關閉當前連接
        closeSocket(sock);
        // 創建當前節點到對面節點的連接
        if (electionAddr != null) {
            connectOne(sid, electionAddr);
        } else {
            connectOne(sid);
        }

    } 
    //自身的話不需要做什么
    else if (sid == self.getId()) {...} 
    else { // 對方id大于自身id
        // 使用目標節點到當前節點的連接
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if (vsw != null) {
            vsw.finish();
        }
        //更新senderWorker與queueSend
        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();
    }
}

// 創建當前節點到對面節點的連接
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
    // 判斷連接是否已經存在
    if (senderWorkerMap.get(sid) != null) {
        if (self.isMultiAddressEnabled() && electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) {
            senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
        }
        return true;
    }
    //初始化連接
    return initiateConnectionAsync(electionAddr, sid);
}

public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {
    if (!inprogressConnections.add(sid)) {
        return true;
    }
    try {
        connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
        connectionThreadCnt.incrementAndGet();
    } catch (Throwable e) {
        inprogressConnections.remove(sid);
        return false;
    }
    return true;
}

//QuorumConnectionReqThread類#run方法
@Override
public void run() {
    try {
        initiateConnection(electionAddr, sid);
    } finally {
        inprogressConnections.remove(sid);
    }
}
//真正建立socket連接
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
    Socket sock = null;
    try {
        if (self.isSslQuorum()) {
            sock = self.getX509Util().createSSLSocket();
        } else {
            sock = SOCKET_FACTORY.get();
        }
        setSockOpts(sock);
        sock.connect(electionAddr.getReachableOrOne(), cnxTO);
        if (sock instanceof SSLSocket) {
            SSLSocket sslSock = (SSLSocket) sock;
            sslSock.startHandshake();
        }
    } catch (X509Exception e) {
        closeSocket(sock);
        return;
    } catch (UnresolvedAddressException | IOException e) {
        closeSocket(sock);
        return;
    }

    try {
        startConnection(sock, sid);
    } catch (IOException e) {
        closeSocket(sock);
    }
}

private boolean startConnection(Socket sock, Long sid) throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);

        long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
        dout.writeLong(protocolVersion);
        dout.writeLong(self.getId());

        // now we send our election address. For the new protocol version, we can send multiple addresses.
        Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
                ? self.getElectionAddress().getAllAddresses()
                : Arrays.asList(self.getElectionAddress().getOne());

        String addr = addressesToSend.stream()
                .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();

        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        closeSocket(sock);
        return false;
    }

    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        authLearner.authenticate(sock, qps.hostname);
    }

    // If lost the challenge, then drop the new connection
    if (sid > self.getId()) {
        closeSocket(sock);
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);
        if (vsw != null) {
            vsw.finish();
        }

        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

        sw.start();
        rw.start();
        return true;
    }
    return false;
}

 

  【8】構建FastLeaderElection做了什么

//構建FastLeaderElection做了什么
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

public static class Notification {

    public static final int CURRENTVERSION = 0x2;
    int version;
    long leader;
    long zxid;
    long electionEpoch;
     QuorumPeer.ServerState state;
     long sid;
    QuorumVerifier qv;
     long peerEpoch;
}

public static class ToSend {

    enum mType {
        crequest,
        challenge,
        notification,
        ack
    }

     long leader;
     long zxid;
     long electionEpoch;
     QuorumPeer.ServerState state;
     long sid;
     byte[] configData =https://www.cnblogs.com/chafry/archive/2022/10/20/ dummyData;
    long peerEpoch;
}

Messenger(QuorumCnxManager manager) {
    this.ws = new WorkerSender(manager);

    this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);

    this.wr = new WorkerReceiver(manager);

    this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);
}

//FastLeaderElection的start()方法做了什么
public void start() {
    this.messenger.start();
}

void start() {
    this.wsThread.start();
    this.wrThread.start();
}

 

  【9】分析發送作業者WorkerSender做了什么

//發送作業者WorkerSender做了什么
class WorkerSender extends ZooKeeperThread {
    volatile boolean stop;
    QuorumCnxManager manager;

    WorkerSender(QuorumCnxManager manager) {
        super("WorkerSender");
        this.stop = false;
        this.manager = manager;
    }

    public void run() {
        while (!stop) {
            try {
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if (m == null) {  continue;  }
                process(m);
            } catch (InterruptedException e) {
                break;
            }
        }
    }

    void process(ToSend m) {
        ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
        manager.toSend(m.sid, requestBuffer);
    }
}

//QuorumCnxManager類#toSend方法
public void toSend(Long sid, ByteBuffer b) {
    //If sending message to myself, then simply enqueue it (loopback).
    if (this.mySid == sid) {
        b.position(0);
        addToRecvQueue(new Message(b.duplicate(), sid));
        //Otherwise send to the corresponding thread to send.

    } else {
        //應用層的發送佇列陣列,每個服務器對應一個佇列,用他們的機器ID作為下標.
        BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
        addToSendQueue(bq, b);
        connectOne(sid);
    }
}

 

  【10】分析接收作業者WorkerReceiver做了什么

//接收作業者
class WorkerReceiver extends ZooKeeperThread {

    volatile boolean stop;
    QuorumCnxManager manager;

    WorkerReceiver(QuorumCnxManager manager) {
        super("WorkerReceiver");
        this.stop = false;
        this.manager = manager;
    }

    public void run() {

        Message response;
        while (!stop) {
            // Sleeps on receive
            try {
                response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                if (response == null) {
                    continue;
                }

                final int capacity = response.buffer.capacity();

                // The current protocol and two previous generations all send at least 28 bytes
                if (capacity < 28) {
                    continue;
                }

                // this is the backwardCompatibility mode in place before ZK-107
                // It is for a version of the protocol in which we didn't send peer epoch
                // With peer epoch and version the message became 40 bytes
                boolean backCompatibility28 = (capacity == 28);

                // this is the backwardCompatibility mode for no version information
                boolean backCompatibility40 = (capacity == 40);

                response.buffer.clear();

                // Instantiate Notification and set its attributes
                Notification n = new Notification();

                int rstate = response.buffer.getInt();
                long rleader = response.buffer.getLong();
                long rzxid = response.buffer.getLong();
                long relectionEpoch = response.buffer.getLong();
                long rpeerepoch;

                int version = 0x0;
                QuorumVerifier rqv = null;

                try {
                    if (!backCompatibility28) {
                        rpeerepoch = response.buffer.getLong();
                        if (!backCompatibility40) {
                            version = response.buffer.getInt();
                        } else {...}
                    } else {
                        rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
                    }

                    // check if we have a version that includes config. If so extract config info from message.
                    if (version > 0x1) {
                        int configLength = response.buffer.getInt();

                        // we want to avoid errors caused by the allocation of a byte array with negative length
                        // (causing NegativeArraySizeException) or huge length (causing e.g. OutOfMemoryError)
                        if (configLength < 0 || configLength > capacity) {
                            throw new IOException(...);
                        }

                        byte[] b = new byte[configLength];
                        response.buffer.get(b);

                        synchronized (self) {
                            try {
                                rqv = self.configFromString(new String(b, UTF_8));
                                QuorumVerifier curQV = self.getQuorumVerifier();
                                if (rqv.getVersion() > curQV.getVersion()) {
                                    if (self.getPeerState() == ServerState.LOOKING) {
                                        self.processReconfig(rqv, null, null, false);
                                        if (!rqv.equals(curQV)) {
                                            self.shuttingDownLE = true;
                                            self.getElectionAlg().shutdown();

                                            break;
                                        }
                                    } else {...}
                                }
                            } catch (IOException | ConfigException e) {...}
                        }
                    } else {...}
                } catch (BufferUnderflowException | IOException e) {
                    continue;
                }
                /*
                 * If it is from a non-voting server (such as an observer or
                 * a non-voting follower), respond right away.
                 */
                if (!validVoter(response.sid)) {
                    Vote current = self.getCurrentVote();
                    QuorumVerifier qv = self.getQuorumVerifier();
                    ToSend notmsg = new ToSend(
                        ToSend.mType.notification,
                        current.getId(),
                        current.getZxid(),
                        logicalclock.get(),
                        self.getPeerState(),
                        response.sid,
                        current.getPeerEpoch(),
                        qv.toString().getBytes(UTF_8));

                    sendqueue.offer(notmsg);
                } else {
                    // Receive new message
                    // State of peer that sent this message
                    QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                    switch (rstate) {
                    case 0:
                        ackstate = QuorumPeer.ServerState.LOOKING;
                        break;
                    case 1:
                        ackstate = QuorumPeer.ServerState.FOLLOWING;
                        break;
                    case 2:
                        ackstate = QuorumPeer.ServerState.LEADING;
                        break;
                    case 3:
                        ackstate = QuorumPeer.ServerState.OBSERVING;
                        break;
                    default:
                        continue;
                    }

                    n.leader = rleader;
                    n.zxid = rzxid;
                    n.electionEpoch = relectionEpoch;
                    n.state = ackstate;
                    n.sid = response.sid;
                    n.peerEpoch = rpeerepoch;
                    n.version = version;
                    n.qv = rqv;

                    //如果這個服務器正處于looking狀態,那么發送提議leader
                    if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                        recvqueue.offer(n);

                        /*
                         * Send a notification back if the peer that sent this
                         * message is also looking and its logical clock is
                         * lagging behind.
                         */
                        if ((ackstate == QuorumPeer.ServerState.LOOKING)
                            && (n.electionEpoch < logicalclock.get())) {
                            Vote v = getVote();
                            QuorumVerifier qv = self.getQuorumVerifier();
                            ToSend notmsg = new ToSend(
                                ToSend.mType.notification,
                                v.getId(),
                                v.getZxid(),
                                logicalclock.get(),
                                self.getPeerState(),
                                response.sid,
                                v.getPeerEpoch(),
                                qv.toString().getBytes());
                            sendqueue.offer(notmsg);
                        }
                    } else {
                        //反之,如果選舉結束了,則將自己服務器記錄的leader資訊發送回給對方
                        Vote current = self.getCurrentVote();
                        if (ackstate == QuorumPeer.ServerState.LOOKING) {
                            if (self.leader != null) {
                                if (leadingVoteSet != null) {
                                    self.leader.setLeadingVoteSet(leadingVoteSet);
                                    leadingVoteSet = null;
                                }
                                self.leader.reportLookingSid(response.sid);
                            }

                            QuorumVerifier qv = self.getQuorumVerifier();
                            ToSend notmsg = new ToSend(
                                ToSend.mType.notification,
                                current.getId(),
                                current.getZxid(),
                                current.getElectionEpoch(),
                                self.getPeerState(),
                                response.sid,
                                current.getPeerEpoch(),
                                qv.toString().getBytes());
                            sendqueue.offer(notmsg);
                        }
                    }
                }
            } catch (InterruptedException e) {...}
        }
    }

}

 

  【11】第五步【5】中quorumPeer.start()呼叫了父類的start(),由于父類就是Thread,所以核心在run方法里面

@Override
public void run() {
    updateThreadName();
    //監控部分,進行了省略
    try {
        jmxQuorumBean = new QuorumBean(this);
        .....
    } catch (Exception e) { jmxQuorumBean = null; }

    try {
        //主體邏輯
        while (running) {
            if (unavailableStartTime == 0) {
                unavailableStartTime = Time.currentElapsedTime();
            }

            switch (getPeerState()) {
            case LOOKING:
                LOG.info("LOOKING");
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                if (Boolean.getBoolean("readonlymode.enabled")) {
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {...} catch (Exception e) {...}
                        }
                    };
                    try {
                        roZkMgr.start();
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        //設定當前的投票
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    LOG.info("OBSERVING");
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    updateServerState();

                    // Add delay jitter before we switch to LOOKING
                    // state to reduce the load of ObserverMaster
                    if (isRunning()) {
                        Observer.waitForObserverElectionDelay();
                    }
                }
                break;
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    updateServerState();
                }
                break;
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    updateServerState();
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        MBeanRegistry instance = MBeanRegistry.getInstance();
        instance.unregister(jmxQuorumBean);
        instance.unregister(jmxLocalPeerBean);

        for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
            instance.unregister(remotePeerBean);
        }

        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
        jmxRemotePeerBean = null;
    }
}

 

  【12】分析核心的選舉流程代碼

//FastLeaderElection類#lookForLeader方法
//核心的選舉演算法
public Vote lookForLeader() throws InterruptedException {
    //監控部分省略
    try {...} catch (Exception e) {...}
    self.start_fle = Time.currentElapsedTime();
    try {
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();
        Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
        int notTimeout = minNotificationInterval;

        synchronized (this) {
            //選舉周期自增
            logicalclock.incrementAndGet();
            //更新選票資訊(初始化的情況下是設定選自己)
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        //發送選票
        sendNotifications();

        SyncedLearnerTracker voteSet = null;
        //在這個回圈中,交換通知,直到找到Leader
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            //接收其他人發過來的選票
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            //從recvqueue取出通知為空的情況
            //如廣播出去8個,由于網路原因可能只收到3個,第四次取的時候就是空的
            //可能收到8個了,但是選舉還沒結束,再次取的時候也是空的
            //為了保證選舉還沒結束的時候,能繼續收到其他Server的選票,并繼續處理判斷,直到選出Leader
            if (n == null) {
                //判斷是否已經被交付,即檢查所有佇列是否為空,表示所有訊息都已傳遞
                if (manager.haveDelivered()) {
                    // 重新發送,目的是為了重新再接收
                    sendNotifications();
                } else {
                    // 重新連接zk集群中的每一個server
                    manager.connectAll();
                }

                notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);

                if (self.getQuorumVerifier() instanceof QuorumOracleMaj
                        && self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
                    setPeerState(proposedLeader, voteSet);
                    Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                    leaveInstance(endVote);
                    return endVote;
                }
            } 
            //驗證發送者的ServerId
            //驗證當前通知推薦的leader的ServerId
            else if (validVoter(n.sid) && validVoter(n.leader)) {
                switch (n.state) {
                case LOOKING:
                    if (getInitLastLoggedZxid() == -1) {
                        break;
                    }
                    if (n.zxid == -1) {
                        break;
                    }
                    // 判斷對方選票的周期是不是比我自身的周期要大
                    // 這種是針對自身曾經宕機過,導致周期比其他人的要小
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    } 
                    //如果對方選票的周期是不是比我自身的周期要小,則是無用票
                    else if (n.electionEpoch < logicalclock.get()) {
                        break;
                    } 
                    //PK邏輯即
                    else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                        //只有對面的比自身的要好,才會從新發起新票
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    // 將選票放入選票的Set集合
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    //獲取voteSet
                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                    //判斷獲取到了大多數節點對當前 Vote 的支持
                    if (voteSet.hasAllQuorums()) {
                        //但是這個時候并不能直接判斷 當前Vote 選擇的Leadee就一定是最終的Leader 
                        //先等待 finalizeWait = 200ms 的時長,如果接收到了訊息
                        //且接收到的 Vote 資訊更新,那么放入到接收佇列中,在下一次回圈中再次比較誰的 Vote 更勝一籌
                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }
                        //當確實沒有 Vote 訊息再傳進來的時候,可以確認最終的Leader,選舉結束
                        //更新節點狀態 
                        //如果選舉出的 leaderId和自身id一樣,表明自己是Leader ,狀態為LEADING
                        //如果不一樣,表明自己是Follower ,狀態為FOLLOWING
                        //最后,退出選舉程序
                        if (n == null) {
                            setPeerState(proposedLeader, voteSet);
                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    break;
                case FOLLOWING:
                    Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                    if (resultFN == null) {
                        break;
                    } else {
                        return resultFN;
                    }
                case LEADING:
                    Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                    if (resultLN == null) {
                        break;
                    } else {
                        return resultLN;
                    }
                default:
                    break;
                }
            } else {
                if (!validVoter(n.leader)) {...}
                if (!validVoter(n.sid)) {...}
            }
        }
        return null;
    } finally {
        //監控相關的就省略了
    }
}
//發送選票方法
private void sendNotifications() {
    //回圈拿出可以參加選舉的節點進行發送選票
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        ToSend notmsg = new ToSend(
            ToSend.mType.notification,
            proposedLeader,
            proposedZxid,
            logicalclock.get(),                //當前的選舉周期
            QuorumPeer.ServerState.LOOKING, //當前節點狀態
            sid,
            proposedEpoch,
            qv.toString().getBytes(UTF_8));
        //放入sendqueue佇列【這里面按斬訓圈是會塞入多個,然后等待WorkerSender進行分發】
        sendqueue.offer(notmsg);
    }
}

//判斷交付邏輯
boolean haveDelivered() {
    for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
        final int queueSize = queue.size();
        //只要有一個佇列為0就回傳true,后面就不看了,因為之前說過只要有一個佇列為空,就說明當前Server與zk集群的連接沒有問題
        if (queueSize == 0) {
            return true;
        }
    }
    //只有當所有佇列都不為空,才說明當前Server與zk集群失聯
    return false;
}

//選票的PK邏輯
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    if (self.getQuorumVerifier().getWeight(newId) == 0) {
        return false;
    }

    //周期高的
    //周期一致,事務id大的
    //周期一致,事務id大的,服務器id大的
    return ((newEpoch > curEpoch)
            || ((newEpoch == curEpoch)
                && ((newZxid > curZxid)
                    || ((newZxid == curZxid)
                        && (newId > curId)))));
}

 

 

  【13】在上面的流程中已經能夠合理的選出了Leader,此時如果有新的節點加入

//接收作業者WorkerReceiver會判斷自身是否還在選舉階段,不是則會將自己服務器記錄的leader資訊發送回給對方
//則此時新加的機器節點的選票狀態會是FOLLOWING或者LEADING 
private Vote receivedLeadingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {

    Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
    if (result == null) {

        if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
            setPeerState(n.leader, voteSet);
            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
            leaveInstance(endVote);
            return endVote;
        } else {
            return null;
        }
    } else {
        return result;
    }
}

private Vote receivedFollowingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {

    if (n.electionEpoch == logicalclock.get()) {
        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
        if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
            setPeerState(n.leader, voteSet);
            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }

    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
    voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
        synchronized (this) {
            logicalclock.set(n.electionEpoch);
            setPeerState(n.leader, voteSet);
        }
        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
        leaveInstance(endVote);
        return endVote;
    }

    return null;
}

 

  【14】總結:整個選舉流程涉及到有價值的點在于:

    1.對于多余的socket連接進行了關閉【但個人覺得,為什么不在監聽者那里就進行判斷呢?這樣可以減少連接,而不是建立了又斷開,重復的搞,可能還有我不清楚的點】

    2.采用了多層佇列架構(異步來優化性能),分別為

構建FastLeaderElection,持有兩個LinkedBlockingQueue:
sendqueue 
recvqueue 
持有messenger,是經過Messenger類包裝過的QuorumCnxManager
messenger的持有:
//兩個執行緒
WorkerSender,負責從sendqueue拿到資料,然后去queueSendMap找對應的sid的佇列塞入資料
WorkerReceiver,負責從manager的recvQueue中拿資料傳到recvqueue
//持有一個佇列
recvQueue

//對應每個socket連接都會有
RecvWorker,負責將DataInputStream里面的資料轉而傳到messenger的recvQueue佇列
SendWorker,負責將各自佇列里面的資料寫入 DataOutputStream
而他們的存放
SendWorker持有RecvWorker
senderWorkerMap{
    ...
    [sid0->SendWorker],
    [sid1->SendWorker]
}
//佇列集合
queueSendMap{
    ...
    [sid0->CircularBlockingQueue],
    [sid1->CircularBlockingQueue]
}

//資料傳輸的流為
DataOutputStream
DataInputStream

 

啟動或leader宕機選舉leader流程

  

leader選舉多層佇列架構

  【1】整個zookeeper選舉底層可以分為選舉應用層和訊息傳輸層,應用層有自己的佇列統一接收和發送選票,傳輸層也設計了自己的佇列,但是按發送的機器分了佇列,避免給每臺機器發送訊息時相互影響,比如某臺機器如果出問題發送不成功則不會影響對正常機器的訊息發送,

   

Leader選舉原始碼流程圖

 

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/518716.html

標籤:其他

上一篇:Scala-集合

下一篇:權限類與頻率類

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more