ZooKeeper(二):多个端口监听的建立逻辑解析 - 等你归去来 - 博客园


本站和网页 https://www.cnblogs.com/yougewe/p/11728073.html 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

ZooKeeper(二):多个端口监听的建立逻辑解析 - 等你归去来 - 博客园
首页
新闻
博问
专区
闪存
班级
我的博客
我的园子
账号设置
简洁模式 ...
退出登录
注册
登录
等你归去来
我约我期,要么求变,否则忍受,等你归去来!
博客园
首页
新随笔
联系
订阅
管理
ZooKeeper(二):多个端口监听的建立逻辑解析
  ZooKeeper 作为优秀的分布系统协调组件,值得一探究竟。它的启动类主要为:
    1. 单机版的zk 使用 ZooKeeperServerMain    2. 集群版的zk 使用 QuorumPeerMain
  与用户端各服务端之间存在着各种通信!当然主要分为三个:
    1. 客户端与zk的通信;    2. 各zk服务端间的通信1, 操作投票通信;    3. 各zk服务端间的通信2, Leader选举通信;
  那么,他们之间都是怎样建立通信机制的呢?让我们从本文一起看看吧!(本文将会倒序的形式展现整个过程)
集群版ZooKeeper的配置可能是这样的:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zk3/data
dataLogDir=/tmp/zk3/log
# 客户端连接端口
clientPort=2183
# 集群配置, host:操作内部通信端口:选举端口
server.1=localhost:2888:3888
server.2=localhost:2899:3899
server.3=localhost:2877:3877
一、客户端连接任务监听连接的建立
  一个死循环一直轮询socket, 进行 select().
// org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread, 监听客户端的请求
public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
select();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
} finally {
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
if (!reconfiguring) {
NIOServerCnxnFactory.this.stop();
LOG.info("accept thread exitted run method");
private void select() {
try {
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
if (key.isAcceptable()) {
if (!doAccept()) {
// If unable to pull a new connection off the accept
// queue, pause accepting to give us time to free
// up file descriptors and so the accept thread
// doesn't spin in a tight loop.
pauseAccept(10);
} else {
LOG.warn("Unexpected ops in accept select {}", key.readyOps());
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
// 上面的客户端监听,是由 configure() 时触发的。
// 而此处的的 addr 则是由解析配置文件的 clientPort 时获取的
// org.apache.zookeeper.server.NIOServerCnxnFactory
@Override
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
configureSaslLogin();
maxClientCnxns = maxcc;
sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores / 2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
String logMsg = "Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
+ numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
LOG.info(logMsg);
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
listenBacklog = backlog;
// 直接使用 java nio 创建端口监听
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port {}", addr);
if (listenBacklog == -1) {
ss.socket().bind(addr);
} else {
ss.socket().bind(addr, listenBacklog);
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
// org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfig
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
LOG.info("Starting quorum peer");
MetricsProvider metricsProvider;
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(),
config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
// 1. 创建客户端连接监听
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
// 1.1. 另一个客户端的安全监听
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
// 创建操作投票线程
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
// 设置选举算法,为后面创建选举监听打下基础
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
// 设置通信端口,选举端口信息
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
quorumPeer.initConfigInZKDatabase();
// 此处的连接是由上面客户端创建的
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (quorumPeer.isQuorumSaslAuthEnabled()) {
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
// 创建 auth server & learner
quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
} finally {
if (metricsProvider != null) {
try {
metricsProvider.stop();
} catch (Throwable error) {
LOG.warn("Error while stopping metrics", error);
// org.apache.zookeeper.server.quorum.QuorumPeerMain#initializeAndRun
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
  总结来说就是,在启动时使用 NIOServerCnxnFactory NIO 基于配置文件创建 ServerSocket, 监听端口的请求, 具体请求进来后的处理逻辑后续再说!
二、选举端口连接的建立
// 内部通信线程,由 Listener 进行处理
// org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener#run
/**
* Sleeps on accept().
*/
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
Exception exitException = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
// ss 代表内部通信的 socket 实例
// 它会延迟到本线程时再进行创建
if (self.shouldUsePortUnification()) {
LOG.info("Creating TLS-enabled quorum server socket");
ss = new UnifiedServerSocket(self.getX509Util(), true);
} else if (self.isSslQuorum()) {
LOG.info("Creating TLS-only quorum server socket");
ss = new UnifiedServerSocket(self.getX509Util(), false);
} else {
// 默认使用 ServerSocket 通信
ss = new ServerSocket();
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()) {
int port = self.getElectionAddress().getPort();
addr = new InetSocketAddress(port);
} else {
// Resolve hostname for this server in case the
// underlying ip address has changed.
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();
LOG.info("My election bind port: {}", addr.toString());
setName(addr.toString());
// 绑定端口后,就可以进行选举通信了
ss.bind(addr);
while (!shutdown) {
try {
// 阻塞等待其他节点发来请求
client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request {}", formatInetAddr((InetSocketAddress) client.getRemoteSocketAddress()));
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
numRetries = 0;
} catch (SocketTimeoutException e) {
LOG.warn(
"The socket is listening for the election accepted "
+ "and it timed out unexpectedly, but will retry."
+ "see ZOOKEEPER-2836");
} catch (IOException e) {
if (shutdown) {
break;
LOG.error("Exception while listening", e);
exitException = e;
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
closeSocket(client);
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error(
"As I'm leaving the listener thread after {} errors. "
+ "I won't be able to participate in leader election any longer: {}."
+ "Use {} property to increase retry count.",
numRetries,
formatInetAddr(self.getElectionAddress()),
ELECTION_PORT_BIND_RETRY);
if (exitException instanceof SocketException) {
// After leaving listener thread, the host cannot join the
// quorum anymore, this is a severe error that we cannot
// recover from, so we need to exit
socketBindErrorHandler.run();
} else if (ss != null) {
// Clean up for shutdown.
try {
ss.close();
} catch (IOException ie) {
// Don't log an error for shutdown.
LOG.debug("Error closing server socket", ie);
// org.apache.zookeeper.server.quorum.QuorumPeer#start
// 开启内部通信线程
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
loadDataBase();
// 开启端口监听
startServerCnxnFactory();
try {
// 管理线程开启
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
// 选举线程开启
startLeaderElection();
// 开启jvm-gc的监控,仅用sleep进行日志打印处理
startJvmPauseMonitor();
super.start();
// 内部通信线程的建立,主要有 WorkerReceiver, WorkerSender, 线程间通过队列通信
// org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver
// 选举任务消息接收线程
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if (response == null) {
continue;
// The current protocol and two previous generations all send at least 28 bytes
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: {}", response.buffer.capacity());
continue;
// this is the backwardCompatibility mode in place before ZK-107
// It is for a version of the protocol in which we didn't send peer epoch
// With peer epoch and version the message became 40 bytes
boolean backCompatibility28 = (response.buffer.capacity() == 28);
// this is the backwardCompatibility mode for no version information
boolean backCompatibility40 = (response.buffer.capacity() == 40);
response.buffer.clear();
// Instantiate Notification and set its attributes
Notification n = new Notification();
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
int version = 0x0;
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
/*
* Version added in 3.4.6
*/
version = response.buffer.getInt();
} else {
LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
} else {
LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
QuorumVerifier rqv = null;
// check if we have a version that includes config. If so extract config info from message.
if (version > 0x1) {
int configLength = response.buffer.getInt();
byte[] b = new byte[configLength];
response.buffer.get(b);
synchronized (self) {
try {
rqv = self.configFromString(new String(b));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}",
self.getId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
LOG.info("restarting leader election");
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();
break;
} else {
LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
} catch (IOException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
} catch (ConfigException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
} else {
LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
/*
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
if (!validVoter(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
// Receive new message
LOG.debug("Receive new notification message. My id = {}", self.getId());
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
/*
* Print notification info
*/
LOG.info(
"Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, "
+ "n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}",
self.getPeerState(),
n.sid,
n.state,
n.leader,
Long.toHexString(n.electionEpoch),
Long.toHexString(n.peerEpoch),
Long.toHexString(n.zxid),
Long.toHexString(n.version),
(n.qv != null ? (Long.toHexString(n.qv.getVersion())) : "0"));
/*
* If this server is looking, then send proposed leader
*/
if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if ((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())) {
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
Vote current = self.getCurrentVote();
if (ackstate == QuorumPeer.ServerState.LOOKING) {
if (self.leader != null) {
if (leadingVoteSet != null) {
self.leader.setLeadingVoteSet(leadingVoteSet);
leadingVoteSet = null;
self.leader.reportLookingSid(response.sid);
LOG.debug(
"Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message", e);
LOG.info("WorkerReceiver is down");
// org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender
// 选举操作消息发送线程
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if (m == null) {
continue;
process(m);
} catch (InterruptedException e) {
break;
LOG.info("WorkerSender is down");
/**
* Called by run() once there is a new message to send.
* @param m message to send
*/
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
manager.toSend(m.sid, requestBuffer);
// 此二线程都是在创建 Message 时初始化的
// org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger#Messenger
/**
* Constructor of class Messenger.
* @param manager Connection manager
*/
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
// 而 Message 是在创建 FastLeaderElection 时初始化的
// org.apache.zookeeper.server.quorum.FastLeaderElection#starter
/**
* This method is invoked by the constructor. Because it is a
* part of the starting procedure of the object that must be on
* any constructor of this class, it is probably best to keep as
* a separate method. As we have a single constructor currently,
* it is not strictly necessary to have it separate.
* @param self QuorumPeer that created this object
* @param manager Connection manager
*/
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
// 初始化 Sender, Receiver
this.messenger = new Messenger(manager);
// org.apache.zookeeper.server.quorum.FastLeaderElection#FastLeaderElection
/**
* Constructor of FastLeaderElection. It takes two parameters, one
* is the QuorumPeer object that instantiated this object, and the other
* is the connection manager. Such an object should be created only once
* by each peer during an instance of the ZooKeeper service.
* @param self QuorumPeer that created this object
* @param manager Connection manager
*/
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
// org.apache.zookeeper.server.quorum.QuorumPeer#createElectionAlgorithm
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
// 把选举端口的监听开启来
listener.start();
// 调用 FastLeaderElection, 发送消息与接收消息的线程开启来
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
break;
default:
assert false;
return le;
// org.apache.zookeeper.server.quorum.QuorumPeer#startLeaderElection
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
// electionAlgorithm 默认是 3, 使用 FastLeaderElection
this.electionAlg = createElectionAlgorithm(electionType);
// org.apache.zookeeper.server.quorum.QuorumPeer#start
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
startLeaderElection();
startJvmPauseMonitor();
super.start();
  总结来说就是,使用 QuorumCnxManager.Listener 来开启选举端口的监听,只要发生了状态变化,就立即重新发起选举操作。默认使用 FastLeaderElection 进行选举操作。
三、操作投票内部通信端口连接的监听
// 只有当前节点被选举为主节点之后,才会建立操作投票端口监听
// org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor#run
@Override
public void run() {
try {
while (!stop) {
Socket s = null;
boolean error = false;
try {
// 基于 Socket 的阻塞等待
s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop) {
LOG.warn("exception while shutting down acceptor.", e);
// When Leader.shutdown() calls ss.close(),
// the call to accept throws an exception.
// We catch and set stop to true.
stop = true;
} else {
throw e;
} catch (SaslException e) {
LOG.error("Exception while connecting to quorum learner", e);
error = true;
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && s != null && !s.isClosed()) {
try {
s.close();
} catch (IOException e) {
LOG.warn("Error closing socket", e);
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
handleException(this.getName(), e);
// 在选举动作完成之后,内部通信端口才开始监听,在构造函数中创建
// org.apache.zookeeper.server.quorum.Leader#Leader
Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();
try {
if (self.shouldUsePortUnification() || self.isSslQuorum()) {
boolean allowInsecureConnection = self.shouldUsePortUnification();
if (self.getQuorumListenOnAllIPs()) {
ss = new UnifiedServerSocket(
self.getX509Util(),
allowInsecureConnection,
self.getQuorumAddress().getPort());
} else {
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
} else {
// 仍然是直接使用 ServerSocket 实现的端口监听,即 BIO
if (self.getQuorumListenOnAllIPs()) {
ss = new ServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new ServerSocket();
ss.setReuseAddress(true);
// 端口使用之前配置中解析出的通信端口
// 然后等待连接
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port {}", self.getQuorumAddress().getPort(), e);
} else {
LOG.error("Couldn't bind to {}", self.getQuorumAddress(), e);
throw e;
this.zk = zk;
// 而 Follower 则主动建立连接到 Leader, 双方通信时, 直接通过这个连接通道进行
// org.apache.zookeeper.server.quorum.Follower#followLeader
/**
* the main method called by the follower to follow the leader
* @throws InterruptedException
*/
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
long connectionTime = 0;
boolean completedSync = false;
try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
// 找到 Leader, 与之建立连接即可
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime = System.currentTimeMillis();
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch "
+ ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch "
+ ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
long startTime = Time.currentElapsedTime();
try {
self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newEpochZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
} finally {
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
if (self.getObserverMasterPort() > 0) {
LOG.info("Starting ObserverMaster");
om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
om.start();
} else {
om = null;
// create a reusable packet to reduce gc impact
QuorumPacket qp = new QuorumPacket();
// 会在此处一直循环等待 leader 节点的信息,忙等 socket
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
closeSocket();
// clear pending revalidations
pendingRevalidations.clear();
} finally {
if (om != null) {
om.stop();
zk.unregisterJMX(this);
if (connectionTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectionTime;
LOG.info(
"Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
leaderAddr,
connectionDuration,
completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
// org.apache.zookeeper.server.quorum.QuorumPeer#makeLeader
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
// QuorumPeer 线程是一个选举线程,它会一直查看 leader 的情况,只要leader变化,就会触发选举,然后做动态平衡
// 选举线程会在每一个 zk 节点上都打开端口监听,而 内部通信的端口监听则只有 Leader 会开启, 其他节点只会负责连接
// org.apache.zookeeper.server.quorum.QuorumPeer
@Override
public void run() {
updateThreadName();
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for (QuorumServer s : getView().values()) {
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
break;
case FOLLOWING:
// 只要判定出当前状态后,就会自行处理异常,直到状态发生变化
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
updateServerState();
break;
} finally {
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);
for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
jmxQuorumBean = null;
jmxLocalPeerBean = null;
jmxRemotePeerBean = null;
  总结来说就是,依赖于选举线程的 QuorumCnxManager.Listener, 当被选举为 Leader 时,才开启操作投票端口的监听!~
  整体zk的架构就是纯异步的架构,从一线程提交到一个线程。但是我们看到,io操作上,只有面向客户端的连接使用了nio技术,而其他内部通信都是采用bio通信,因为并非所有nio都要比bio要好,还得看具体场景。
  作为网络应用,整个协议过程我们大概知道就行,tcp/ip, osi协议。但是对于应用层协议的起点,tcp/udp socket 是如何交给应用的,就是我们要从根本上理解网络应用的起点。否则,就只能看看热闹了。
  看懂了zk的多个端口连接的建立过程,对于后续面对各种复杂交错的服务器间交互,是有很大帮助的。(当然了,如果你不求甚解那可能就没啥差别了)
不要害怕今日的苦,你要相信明天,更苦!
posted @
2019-10-23 18:50
等你归去来
阅读(2583)
评论(0)
编辑
收藏
举报
刷新评论刷新页面返回顶部
Copyright 2022 等你归去来
Powered by .NET 7.0 on Kubernetes