利用过年这几天的一些空余时间,把《从PAXOS到ZOOKEEPER分布式一致性原理与实践》看了一遍。对ZooKeeper有了一个大致的了解。ZooKeeper作为Chubby的开源实现,在当前的分布式生产环境中有着广泛的应用。使用ZooKeeper可以方便的实现Dynamic DNS、微服务架构中的服务路由以及为其他分布式系统提供支撑(如Kafka, HBase)。
作为个人关于ZooKeeper的第一篇blog,我准备分析一下ZooKeeper的事务流程。事务处理是ZooKeeper的一大重要功能。在ZooKeeper事务流程实现中,所有的事务请求都需要从Leader Server协调完成。因此,从ZooKeeper的Leader Sever来分析事务流程更为简单。
首先分析LeaderZooKeeperServer类,它是ZooKeeperServer的子类,一个该类型的实例其实就对应一个leader服务器。这个类实现了一个方法,其中包含了重要信息。
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false);
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
这段代码其实内容很简单,却非常重要。这里采用了类似于“责任链”的设计模式,该函数的作用就是把处理请求的逻辑链建立起来。“责任链”的确是建立起来了,那么这条链是何时被触发的呢?LeaderZooKeeperServer是QuorumZooKeeperServer的直接子类。该类有一个QuorumPeer类型的成员self,而self有有一个Leader类型的成员leader。这个leader类型的变量完成了网络交互方面的工作,也是触发前文中“责任链”的地方。对于Leader的实现,我们有必要再看一段代码,首先是Leader的构造函数。
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
this.self = self;
try {
if (self.getQuorumListenOnAllIPs()) {
ss = new ServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new ServerSocket();
}
ss.setReuseAddress(true);
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
} else {
LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
}
throw e;
}
this.zk=zk;
}
从这里就看出来了,Leader其实就是一个服务器,通过服务器套接字接受客户端和Follower的连接。涉及到套接字通信,其实重点关注收发的逻辑就好。对于接收和发送功能,Leader主要通过LearnerHandler来实现。首先来看接收功能吧!简略地贴点代码
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
/**略**/
switch (qp.getType()) {
/**略**/
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitRequest(si);
break;
}
/**略**/
}
LearnerHandler其实是Thread的子类,而上面的这段代码其实是它的run方法。这段代码的死循环就是请求处理的核心代码。如果QuorumPacket的类型是Request,那么将对报文进行简单的处理,然后提交给leader成员所指向的LeaderZooKeeperServer。而这个submitRequest()函数才是真正地实现了“责任链”的触发。而这个提交请求的函数做的事情也很简单,就是把request放到一个请求队列中。
Leader在ProposalRequestProcessor中会根据事务性请求生成对应的Proposal,同时把这个提议广播出去。在更进一步之前,是需要先等待followers的应答的。对于ack的应答的处理逻辑也在LearnerHandler中,而LearnerHandler也只是简单的把ack包提交给Leader的processAck()函数。processAck()函数的代码片段如下:
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (outstandingProposals.size() == 0) {
return;
}
Proposal p = outstandingProposals.get(zxid);
p.ackSet.add(sid);
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
outstandingProposals.remove(zxid);
if (p.request != null) {
toBeApplied.add(p);
}
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
这个函数的重要代码都被我筛选出来了,看起来并不复杂。从outstandingProposal中找到对应的proposal,然后把follow服务器的sid添加到这个proposal的ackSet中。然后判断是否已经有过半的服务器通过了该proposal,如果条件为真,那么就把这个proposal提交给commitProcessor。
而后的故事就在下一篇博客中再分析了。
发表回复