IOTXING

记录技术学习之路

0%

负责选举的相关

选举流程

  1. 先判断节点当前的状态,以及是否需要停止
  2. 从接收队列里面取出来一个投票,然后校验是否合法
  3. 如果选举周期不一致,比较目标投票以及自己投票的信息, 然后选择更新自身的投票信息然后通知别的节点
  4. 如果选举周期一致,将选票信息存入到recvset中,以接收到的节点的id为key,根据节点投票的信息生成一个vote当做value。当循环结束的时候,recvset中会存储所有的参与者的id,以及他们的投票信息
  5. 根据recvset和自己的投票leader信息获取到一个投票追踪器,里面会对各个投票信息进行统计,以投票的leader id为key,投票的节点放入到一个set中,然后判断该set的大小是否已经超过了参与者的半数
  6. 如果已经达成了quorum协议,会再尝试从接收队列中取投票,看看是否有新的投票出来,如果有新的进来,会重新开始循环。如果没有,则修改目标leader的状态,然后退出选举
Read more »

对象属性定义

1
2
3
4
5
6
7
8
9
private final int version;

private final long id;

private final long zxid;

private final long electionEpoch;

private final long peerEpoch;
Read more »

zxid计算的相关,其中高32位存储Epoch的id,低32位存储事务的计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ZxidUtils {

public static long getEpochFromZxid(long zxid) {
return zxid >> 32L;
}
public static long getCounterFromZxid(long zxid) {
return zxid & 0xffffffffL;
}
public static long makeZxid(long epoch, long counter) {
return (epoch << 32L) | (counter & 0xffffffffL);
}
public static String zxidToString(long zxid) {
return Long.toHexString(zxid);
}

}

Quorum的验证器,Quorum是一种分布式的机制,用来定义在分布式系统中,某个请求是否能够被确认。

ZK默认的实现是QuorumMaj,也就是超过半数投票者的时候,认为投票通过

Read more »

  • 表示节点的状态

状态

  • UNKNOWN_STATE : 未知
  • LOOKING_STATE:查找 leader
  • LEADING_STATE:leader状态
  • FOLLOWING_STATE:跟随状态
  • OBSERVING_STATE:观察状态
Read more »

  • 分布式节点的相关代码

基础变量

1
2
3
4
5
6
7
8
9
10
11
//Quorum验证器
private QuorumVerifier quorumVerifier;

//最近的一个验证器
private QuorumVerifier lastSeenQuorumVerifier = null;

//当前的epoch
public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";

//已经接收的epoch
public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
Read more »

整个项目的启动入口

关系图

https://iotxing-1253163150.cos.ap-shanghai.myqcloud.com/ZK/QuorumPeerMain.jpg

initializeAndRun

main的主要内容,用于初始化配置,启动清除线程,run server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig(); //读取节点的配置
if (args.length == 1) {
config.parse(args[0]);
}

// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

runFromConfig

如果是分布式部署的话,会通过该函数进行节点的启动。如果是单节点的话,是通过ZooKeeperServerMain.main(args)的方式启动

所有代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
public class QuorumPeerMain {

private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);

private static final String USAGE = "Usage: QuorumPeerMain configfile";

protected QuorumPeer quorumPeer;

/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
LOG.info("Exiting normally");
ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}

// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();

if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
MetricsProvider metricsProvider;
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(),
config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}

if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}

quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); //将id加入到map中
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (quorumPeer.isQuorumSaslAuthEnabled()) {
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}

quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
} finally {
if (metricsProvider != null) {
try {
metricsProvider.stop();
} catch (Throwable error) {
LOG.warn("Error while stopping metrics", error);
}
}
}
}

// @VisibleForTesting
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer();
}

}