IOTXING

记录技术学习之路

0%

zookeeper源码学习-QuorumPeer

  • 分布式节点的相关代码

基础变量

1
2
3
4
5
6
7
8
9
10
11
//Quorum验证器
private QuorumVerifier quorumVerifier;

//最近的一个验证器
private QuorumVerifier lastSeenQuorumVerifier = null;

//当前的epoch
public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";

//已经接收的epoch
public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";

QuorumPeer 新建节点

1
2
3
4
5
6
7
8
public QuorumPeer() throws SaslException {
super("QuorumPeer"); //新建一个线程,然后命名
quorumStats = new QuorumStats(this); //新建一个状态
jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
adminServer = AdminServerFactory.createAdminServer();
x509Util = createX509Util();
initialize(); //初始化
}

initialize 初始化认证服务器和learner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void initialize() throws SaslException {
// init quorum auth server & learner
if (isQuorumSaslAuthEnabled()) {
Set<String> authzHosts = new HashSet<String>();
for (QuorumServer qs : getView().values()) {
authzHosts.add(qs.hostname);
}
authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts);
authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext);
} else {
authServer = new NullQuorumAuthServer();
authLearner = new NullQuorumAuthLearner();
}
}

start 启动函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public synchronized void start() {
if (!getView().containsKey(myid)) { //myid是在QuorumPeerMain中设定的
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}

setQuorumVerifier 设置Quorum验证器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
synchronized (QV_LOCK) { //保证线程安全
if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) { //如果当前验证器存在,并且版本大于传入的版本,使用当前存在的验证器
// this is normal. For example - server found out about new config through FastLeaderElection gossiping
// and then got the same config in UPTODATE message so its already known
LOG.debug(
"{} setQuorumVerifier called with known or old config {}. Current version: {}",
getId(),
qv.getVersion(),
quorumVerifier.getVersion());
return quorumVerifier;
}
QuorumVerifier prevQV = quorumVerifier;
quorumVerifier = qv; //将当前存在的验证器替换为新的
if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) {
lastSeenQuorumVerifier = qv; //如果之前不存在提议验证器或者已经过时了,替换
}

if (writeToDisk) { //是否把配置文件写入磁盘
// some tests initialize QuorumPeer without a static config file
if (configFilename != null) {
try {
String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
} catch (IOException e) {
LOG.error("Error closing file", e);
}
} else {
LOG.info("writeToDisk == true but configFilename == null");
}
}

if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename()); //如果当前版本相同,删除动态生成的文件
}
QuorumServer qs = qv.getAllMembers().get(getId());
if (qs != null) {
setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
}
updateObserverMasterList();
return prevQV;
}
}

loadDataBase 读取本地数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void loadDataBase() {
try {
zkDb.loadDataBase(); //加载本地快照内容

// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; //获取最近一次处理的Zxid
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); //获取epochId
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); //从当前存储的文件中读取当前的epochId
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
CURRENT_EPOCH_FILENAME,
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); //如果没有找到当前的本地文件,将快照中的epochId写进去
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, "
+ ZxidUtils.zxidToString(currentEpoch) //当前的epoch小于快照中的,说明数据有丢失
+ ", is older than the last zxid, "
+ lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
ACCEPTED_EPOCH_FILENAME,
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, "
+ ZxidUtils.zxidToString(acceptedEpoch)
+ " is less than the current epoch, "
+ ZxidUtils.zxidToString(currentEpoch));
}
} catch (IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}

startLeaderElection 开始leader的选举

创建选举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager(); //创建一个网络哦管理器
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

Run 轮询的逻辑

根据当前peer的状态,进行不同的处理

  • Looking
    • 如果当前是Looking状态,会有几种可能
      • 集群刚启动,暂时没有master
      • 节点刚启动,等待同步master

如果设置中允许只读模式,在zk集群整体无法对外提供服务的时候,允许机器对外提供只读的服务,不能进行写入

调用选举策略中的选举方法 ,也就是下面的lookForLeader

FastLeaderElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
public void run() {
updateThreadName();

LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for (QuorumServer s : getView().values()) {
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}

try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
observer.shutdown();
setObserver(null);
updateServerState();

// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);

for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
}

jmxQuorumBean = null;
jmxLocalPeerBean = null;
jmxRemotePeerBean = null;
}
}