【ZooKeeper】ZooKeeper

Apache ZooKeeper - Wikipedia

ZooKeeper 简介

ZooKeeper 是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。ZooKeeper安装见Linux 开发环境配置文档

image-20210824153402010

本文档参考尚硅谷ZooKeeper教程:https://www.bilibili.com/video/BV1to4y1C7gw?p=2

ZooKeeper 工作机制

ZooKeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。ZooKeeper服务器采用NIO与客户端进行通讯

ZooKeeper = 文件系统 + 通知机制

image-20210824154028992

ZooKeeper 特点

  • ZooKeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
  • 集群中只要有半数以上节点存活,ZooKeeper集群就能正常服务。所以ZooKeeper适合安装奇数台服务器。
  • 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
  • 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
  • 数据更新原子性:一次数据更新要么成功,要么失败。
  • 实时性:在一定时间范围内,Client能读到最新数据。

image-20210824153726239

ZooKeeper 应用场景

统一命名服务

image-20210824153921084

统一配置管理

image-20210824153938000

统一集群管理

image-20210824154010757

服务器动态上下线

image-20210824154028992

软负载均衡

image-20210824154140428

CAP 理论

image-20210824171128237

ZooKeeper保证的是CP

  • ZooKeeper不能保证每次服务请求的可用性,但需要保证数据的一致性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要 重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
  • 例如进行Leader选举时集群都是不可用。

ZooKeeper 选举机制

第一次启动时

根据SID决定,投票都交给SID大的,一旦到达半数就选举出了Leader,其后的服务器都作为Follower。因此若服务器台数为奇数,则Leader为中位数位置的服务器。

image-20210824154209202

非第一次启动

选举Leader规则:

  1. EPOCH大的直接胜出
  2. EPOCH相同,ZXID大的胜出
  3. ZXID相同,SID大的胜出

image-20210824154403538

ZooKeeper 安装

ZooKeeper本地安装见Linux 开发环境配置文档

操作 ZooKeeper

启动 ZooKeeper

1
$ ~/bin/zkServer.sh start

查看状态

1
2
3
4
$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Mode: standalone

启动客户端

1
$ bin/zkCli.sh

退出客户端

1
[zk: localhost:2181(CONNECTED) 0] quit

停止 ZooKeeper

1
$ bin/zkServer.sh stop

配置参数解读

ZooKeeper中的配置文件zoo.cfg中参数含义解读如下:

  • tickTime = 2000:通信心跳时间,ZooKeeper服务器与客户端心跳时间,单位毫秒

image-20210824160926660

  • initLimit = 10:LF初始通信时限(Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量))
  • syncLimit = 5:LF同步通信时限(Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死亡,从服务器列表中删除Follwer)
  • dataDir:保存Zookeeper中的数据(注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录)
  • clientPort = 2181:客户端连接端口,通常不做修改

ZooKeeper 集群安装

ZooKeeper 集群安装见视频:https://www.bilibili.com/video/BV1to4y1C7gw?p=9

常用命令

命令基本语法 功能描述
help 显示所有操作命令
ls path 使用 ls 命令来查看当前 znode 的子节点 [可监听] -w 监听子节点变化 -s 附加次级信息
create 普通创建 -s 含有序列 -e 临时(重启或者超时消失)
get path 获得节点的值 [可监听] -w 监听节点内容变化 -s 附加次级信息
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点

启动客户端

1
$ bin/zkCli.sh -server xxx:2181

显示所有操作命令

1
[zk: hadoop102:2181(CONNECTED) 1] help

查看当前znode中所包含的详细内容

1
2
3
4
5
6
7
8
9
10
11
12
[zk: hadoop102:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  • czxid:创建节点的事务 zxid 每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所 有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之 前发生。
  • ctime:znode 被创建的毫秒数(从 1970 年开始)
  • mzxid:znode 最后更新的事务 zxid
  • mtime:znode 最后修改的毫秒数(从 1970 年开始)
  • pZxid:znode 最后更新的子节点 zxid
  • cversion:znode 子节点变化号,znode 子节点修改次数
  • dataversion:znode 数据变化号
  • aclVersion:znode 访问控制列表的变化号
  • ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是 临时节点则是 0。
  • dataLength:znode 的数据长度
  • numChildren:znode 子节点数量

节点类型

  • 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
  • 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除

image-20210824154555708

操作节点

  1. 分别创建2个普通节点:永久节点 + 不带序号(注意:创建节点时,要赋值)
1
2
3
4
[zk: localhost:2181(CONNECTED) 3] create /sanguo "diaochan"
Created /sanguo
[zk: localhost:2181(CONNECTED) 4] create /sanguo/shuguo "liubei"
Created /sanguo/shuguo
  1. 获得节点的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[zk: localhost:2181(CONNECTED) 5] get -s /sanguo
diaochan
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000003
mtime = Wed Aug 29 00:03:23 CST 2018
pZxid = 0x100000004
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 1

[zk: localhost:2181(CONNECTED) 6] get -s /sanguo/shuguo
liubei
cZxid = 0x100000004
ctime = Wed Aug 29 00:04:35 CST 2018
mZxid = 0x100000004
mtime = Wed Aug 29 00:04:35 CST 2018
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
  1. 创建带序号的节点:永久节点 + 带序号

先创建一个普通的根节点/sanguo/weiguo

1
2
[zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao"
Created /sanguo/weiguo

创建带序号的节点

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000000

[zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000001

[zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu"
Created /sanguo/weiguo/xuchu0000000002

如果原来没有序号节点,序号从0开始依次递增。如果原节点下已有2个节点,则再排序时从 2 开始,以此类推。

  1. 创建短暂节点(短暂节点 + 不带序号 or 带序号)

创建短暂的不带序号的节点

1
2
[zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo

创建短暂的带序号的节点

1
2
[zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu"
Created /sanguo/wuguo0000000001

在当前客户端是能查看到的

1
2
[zk: localhost:2181(CONNECTED) 3] ls /sanguo
[wuguo, wuguo0000000001, shuguo]
  1. 修改节点数据值
1
[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"

监听器原理

  • 在主程序中创建(new)ZooKeeper客户端zkClient,这时就会创建两个线程:
    • 一个线程负责网络连接通信(connect)
    • 一个线程负责监听(listener)
  • 通过connect线程将注册的监听事件发送给ZooKeeper服务器zkServer
  • zkServer的注册监听器列表中将注册的监听事件添加到列表中
  • zkServer监听到相应事件发生时(有数据或路径变化),就会将这个消息发送给之前注册的zkClientlistener线程
  • zkClientlistener线程调用重写的process()方法。

image-20210824154624360

客户端zkClient的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 当zk服务器监听到相应事件后发生时,调用当前zk客户端重写的process()方法
try {
getServerList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

常见的监听:

  • 监听节点数据的变化:get path [watch]
  • 监听子节点增减的变化:ls path [watch]

客户端向服务器写数据的流程

当客户端直接向Leader写数据时

  • 客户端发来的消息(create xxx)发送给了Leader
  • Leader将该消息发送给其他的Follower直到半数以上Follower回复了确认消息ack
  • 此时再返回给客户端已收到了消息ack
  • 之后Leader继续将该消息发送给其他的Follower直到所有的Follower都收到了消息
  • 最后所有的服务器再一起执行命令(create xxx)

注意:ZooKeeper服务器同步的是消息命令(create xxx)而非数据本身。当所有服务器都收到了消息命令才会分别执行该命令。详细原理见消息广播

image-20210824163406289

当客户端向Follower写数据时

  • 该Follower会将该消息发送给Leader
  • 再由Leader将该消息同步到其他Follower,
  • 当半数以上Follower收到了该消息后,Leader回复最开始的Follower消息ack
  • 该Follower再回复客户端已收到消息ack
  • 之后Leader继续同步该消息直到所有Follower都收到了该消息命令
  • 所有服务器都收到消息后再分别执行该消息命令

与第一种情况相比多了一步:收到消息的Follower向Leader汇报收到了客户端的命令,Leader再将该命令进行同步。

总结:消息同步由Leader负责执行

image-20210824163357554

服务器动态上下线监听案例

先在集群上创建/servers 节点

1
2
[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers

引入Maven依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.zhao.case1;

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {

private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {

DistributeServer server = new DistributeServer();
// 1 获取zk连接
server.getConnect();

// 2 注册服务器到zk集群
server.regist(args[0]);


// 3 启动业务逻辑(睡觉)
server.business();

}

private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}

private void regist(String hostname) throws KeeperException, InterruptedException {
String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(hostname +" is online") ;
}

private void getConnect() throws IOException {

zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {

}
});
}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.zhao.case1;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();

// 1 获取zk连接
client.getConnect();

// 2 监听/servers下面子节点的增加和删除
client.getServerList();

// 3 业务逻辑(睡觉)
client.business();

}

private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}

private void getServerList() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/servers", true);

ArrayList<String> servers = new ArrayList<>();

for (String child : children) {

byte[] data = zk.getData("/servers/" + child, false, null);

servers.add(new String(data));
}

// 打印
System.out.println(servers);
}

private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {

try {
getServerList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

ZooKeeper 分布式锁

假设"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

image-20210824164337198

原生 Zookeeper 实现分布式锁案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package com.zhao.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {

private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zk;

private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);

private String waitPath;
private String currentMode;

public DistributedLock() throws IOException, InterruptedException, KeeperException {

// 获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// connectLatch 如果连接上zk 可以释放
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
connectLatch.countDown();
}

// waitLatch 需要释放
if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
waitLatch.countDown();
}
}
});

// 等待zk正常连接后,往下走程序
connectLatch.await();

// 判断根节点/locks是否存在
Stat stat = zk.exists("/locks", false);

if (stat == null) {
// 创建一下根节点
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}

// 对zk加锁
public void zklock() {
// 创建对应的临时带序号节点
try {
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// wait一小会, 让结果更清晰一些
Thread.sleep(10);

// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点

List<String> children = zk.getChildren("/locks", false);

// 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
if (children.size() == 1) {
return;
} else {
Collections.sort(children);

// 获取节点名称 seq-00000000
String thisNode = currentMode.substring("/locks/".length());
// 通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);

// 判断
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// 就一个节点,可以获取锁了
return;
} else {
// 需要监听 他前一个节点变化
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath,true,new Stat());

// 等待监听
waitLatch.await();

return;
}
}


} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}


}

// 解锁
public void unZkLock() {

// 删除节点
try {
zk.delete(this.currentMode,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}

}
}

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.zhao.case2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributedLockTest {

public static void main(String[] args) throws InterruptedException, IOException, KeeperException {

final DistributedLock lock1 = new DistributedLock();

final DistributedLock lock2 = new DistributedLock();

new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zklock();
System.out.println("线程1 启动,获取到锁");
Thread.sleep(5 * 1000);

lock1.unZkLock();
System.out.println("线程1 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {

try {
lock2.zklock();
System.out.println("线程2 启动,获取到锁");
Thread.sleep(5 * 1000);

lock2.unZkLock();
System.out.println("线程2 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}
}

Curator 框架实现分布式锁案例

原生的 Java API 开发存在的问题

  • 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
  • Watch 需要重复注册,不然就不能生效
  • 开发的复杂性还是比较高的
  • 不支持多节点删除和创建,需要自己去递归

Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。 详情请查看官方文档:https://curator.apache.org/index.html

Curator 案例实操

引入Maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.zhao.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

public static void main(String[] args) {

// 创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

// 创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("线程1 获取到锁");

lock1.acquire();
System.out.println("线程1 再次获取到锁");

Thread.sleep(5 * 1000);

lock1.release();
System.out.println("线程1 释放锁");

lock1.release();
System.out.println("线程1 再次释放锁");

} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("线程2 获取到锁");

lock2.acquire();
System.out.println("线程2 再次获取到锁");

Thread.sleep(5 * 1000);

lock2.release();
System.out.println("线程2 释放锁");

lock2.release();
System.out.println("线程2 再次释放锁");

} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}

private static CuratorFramework getCuratorFramework() {

ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();

// 启动客户端
client.start();

System.out.println("zookeeper 启动成功");
return client;
}
}

ZooKeeper 常见问题

选举机制

半数机制,超过半数的投票通过,即通过。

  • 第一次启动选举规则: 投票过半数时,服务器 id 大的胜出
  • 第二次启动选举规则:
    • EPOCH 大的直接胜出
    • EPOCH 相同,事务 id 大的胜出
    • 事务 id 相同,服务器 id 大的胜出

生产集群安装多少 zk 合适?

安装奇数台。 生产经验:

  • 10 台服务器:3 台 zk;
  • 20 台服务器:5 台 zk;
  • 100 台服务器:11 台 zk;
  • 200 台服务器:11 台 zk

服务器台数多:好处,提高可靠性;坏处:提高通信延时

常用命令

ls、get、create、delete

ZooKeeper 原理

https://www.bilibili.com/video/BV1to4y1C7gw?p=30

Paxos 算法

image-20210824170129496

Paxos算法描述

image-20210824170151117

Paxos算法流程

image-20210824170206961

情况1:

image-20210824170235036

情况2:

image-20210824170251898

情况3:

image-20210824170313230

消息广播

image-20210824170331885

ZAB协议针对事务请求的处理过程类似于一个两阶段提交过程 :

  • 广播事务阶段
  • 广播提交操作

有可能因为Leader宕机带来数据不一致,比如

  • Leader 发 起 一 个 事 务 Proposal1 后 就 宕 机 , Follower 都 没 有 Proposal1
  • Leader收到半数ACK宕机, 没来得及向Follower发送Commit

怎么解决呢?ZAB引入了崩溃恢复模式。

崩溃恢复——异常假设

一旦Leader服务器出现崩溃或者由于网络原因导致Leader服务器失去了与过半 Follower的联系,那么就会进入崩溃恢复模式。

image-20210824170416176

崩溃恢复——Leader 选举

image-20210824170947242

崩溃恢复——数据恢复

image-20210824171017628

崩溃恢复——异常提案处理

image-20210824171113705

CAP 理论

image-20210824171128237

ZooKeeper保证的是CP

  • ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要 重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
  • 进行Leader选举时集群都是不可用。

持久化

每台服务器都会在磁盘保存snapShot快照TxnLog编辑日志,二者合起来就等于内存中的数据。其中服务器在收到命令时会更新TxnLog编辑日志的内容,过一段时间后再持久化到snapShot快照中。

image-20210824171159816

服务器间同步化

image-20210824171447010

ZK服务端初始化源码解析

image-20210824171524730

image-20210824171532346

ZK选举源码解析

image-20210824171547184

ZK选举准备源码解析

image-20210824171608595

ZK选举执行源码解析

image-20210824171754085

Follower 和 Leader 状态同步源码解析

image-20210824171809053

Follower 和 Leader 状态同步源码解析

image-20210824171829473

服务端Leader启动

image-20210824171851563

服务端Follower启动

image-20210824171904655

客户端初始化源码解析

image-20210824171925106