ZooKeeper 简介
ZooKeeper 是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。ZooKeeper安装见Linux 开发环境配置文档
本文档参考尚硅谷ZooKeeper教程:https://www.bilibili.com/video/BV1to4y1C7gw?p=2
ZooKeeper 工作机制
ZooKeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。ZooKeeper服务器采用NIO与客户端进行通讯
ZooKeeper = 文件系统 + 通知机制
ZooKeeper 特点
- ZooKeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
- 集群中只要有半数以上节点存活,ZooKeeper集群就能正常服务。所以ZooKeeper适合安装奇数台服务器。
- 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
- 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
- 数据更新原子性:一次数据更新要么成功,要么失败。
- 实时性:在一定时间范围内,Client能读到最新数据。
ZooKeeper 应用场景
统一命名服务
统一配置管理
统一集群管理
服务器动态上下线
软负载均衡
CAP 理论
ZooKeeper保证的是CP
- ZooKeeper不能保证每次服务请求的可用性,但需要保证数据的一致性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要 重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
- 例如进行Leader选举时集群都是不可用。
ZooKeeper 选举机制
第一次启动时
根据SID决定,投票都交给SID大的,一旦到达半数就选举出了Leader,其后的服务器都作为Follower。因此若服务器台数为奇数,则Leader为中位数位置的服务器。
非第一次启动
选举Leader规则:
EPOCH
大的直接胜出
EPOCH
相同,ZXID
大的胜出
ZXID
相同,SID
大的胜出
ZooKeeper 安装
ZooKeeper本地安装见Linux 开发环境配置文档
操作 ZooKeeper
启动 ZooKeeper
1
| $ ~/bin/zkServer.sh start
|
查看状态
1 2 3 4
| $ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: standalone
|
启动客户端
退出客户端
1
| [zk: localhost:2181(CONNECTED) 0] quit
|
停止 ZooKeeper
配置参数解读
ZooKeeper中的配置文件zoo.cfg
中参数含义解读如下:
tickTime = 2000
:通信心跳时间,ZooKeeper服务器与客户端心跳时间,单位毫秒
initLimit = 10
:LF初始通信时限(Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量))
syncLimit = 5
:LF同步通信时限(Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死亡,从服务器列表中删除Follwer)
dataDir
:保存Zookeeper中的数据(注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录)
clientPort = 2181
:客户端连接端口,通常不做修改
ZooKeeper 集群安装
ZooKeeper 集群安装见视频:https://www.bilibili.com/video/BV1to4y1C7gw?p=9
常用命令
命令基本语法 |
功能描述 |
help |
显示所有操作命令 |
ls path |
使用 ls 命令来查看当前 znode 的子节点 [可监听] -w 监听子节点变化 -s 附加次级信息 |
create |
普通创建 -s 含有序列 -e 临时(重启或者超时消失) |
get path |
获得节点的值 [可监听] -w 监听节点内容变化 -s 附加次级信息 |
set |
设置节点的具体值 |
stat |
查看节点状态 |
delete |
删除节点 |
deleteall |
递归删除节点 |
启动客户端
1
| $ bin/zkCli.sh -server xxx:2181
|
显示所有操作命令
1
| [zk: hadoop102:2181(CONNECTED) 1] help
|
查看当前znode中所包含的详细内容
1 2 3 4 5 6 7 8 9 10 11 12
| [zk: hadoop102:2181(CONNECTED) 5] ls -s / [zookeeper]cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1
|
czxid
:创建节点的事务 zxid 每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所 有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之 前发生。
ctime
:znode 被创建的毫秒数(从 1970 年开始)
mzxid
:znode 最后更新的事务 zxid
mtime
:znode 最后修改的毫秒数(从 1970 年开始)
pZxid
:znode 最后更新的子节点 zxid
cversion
:znode 子节点变化号,znode 子节点修改次数
dataversion
:znode 数据变化号
aclVersion
:znode 访问控制列表的变化号
ephemeralOwner
:如果是临时节点,这个是 znode 拥有者的 session id。如果不是 临时节点则是 0。
dataLength
:znode 的数据长度
numChildren
:znode 子节点数量
节点类型
- 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
- 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
操作节点
- 分别创建2个普通节点:永久节点 + 不带序号(注意:创建节点时,要赋值)
1 2 3 4
| [zk: localhost:2181(CONNECTED) 3] create /sanguo "diaochan" Created /sanguo [zk: localhost:2181(CONNECTED) 4] create /sanguo/shuguo "liubei" Created /sanguo/shuguo
|
- 获得节点的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| [zk: localhost:2181(CONNECTED) 5] get -s /sanguo diaochan cZxid = 0x100000003 ctime = Wed Aug 29 00:03:23 CST 2018 mZxid = 0x100000003 mtime = Wed Aug 29 00:03:23 CST 2018 pZxid = 0x100000004 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 7 numChildren = 1
[zk: localhost:2181(CONNECTED) 6] get -s /sanguo/shuguo liubei cZxid = 0x100000004 ctime = Wed Aug 29 00:04:35 CST 2018 mZxid = 0x100000004 mtime = Wed Aug 29 00:04:35 CST 2018 pZxid = 0x100000004 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 0
|
- 创建带序号的节点:永久节点 + 带序号
先创建一个普通的根节点/sanguo/weiguo
1 2
| [zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao" Created /sanguo/weiguo
|
创建带序号的节点
1 2 3 4 5 6 7 8
| [zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000000
[zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000001
[zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu" Created /sanguo/weiguo/xuchu0000000002
|
如果原来没有序号节点,序号从0开始依次递增。如果原节点下已有2个节点,则再排序时从 2 开始,以此类推。
- 创建短暂节点(短暂节点 + 不带序号 or 带序号)
创建短暂的不带序号的节点
1 2
| [zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu" Created /sanguo/wuguo
|
创建短暂的带序号的节点
1 2
| [zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu" Created /sanguo/wuguo0000000001
|
在当前客户端是能查看到的
1 2
| [zk: localhost:2181(CONNECTED) 3] ls /sanguo [wuguo, wuguo0000000001, shuguo]
|
- 修改节点数据值
1
| [zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
|
监听器原理
- 在主程序中创建(new)ZooKeeper客户端
zkClient
,这时就会创建两个线程:
- 一个线程负责网络连接通信(connect)
- 一个线程负责监听(listener)
- 通过
connect
线程将注册的监听事件发送给ZooKeeper服务器zkServer
- 在
zkServer
的注册监听器列表中将注册的监听事件添加到列表中
- 当
zkServer
监听到相应事件发生时(有数据或路径变化),就会将这个消息发送给之前注册的zkClient
的listener
线程
zkClient
的listener
线程调用重写的process()
方法。
客户端zkClient
的代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13
| zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { getServerList(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } });
|
常见的监听:
- 监听节点数据的变化:
get path [watch]
- 监听子节点增减的变化:
ls path [watch]
客户端向服务器写数据的流程
当客户端直接向Leader写数据时
- 客户端发来的消息(create xxx)发送给了Leader
- Leader将该消息发送给其他的Follower直到半数以上Follower回复了确认消息ack
- 此时再返回给客户端已收到了消息ack
- 之后Leader继续将该消息发送给其他的Follower直到所有的Follower都收到了消息
- 最后所有的服务器再一起执行命令(create xxx)
注意:ZooKeeper服务器同步的是消息命令(create xxx)而非数据本身。当所有服务器都收到了消息命令才会分别执行该命令。详细原理见消息广播
当客户端向Follower写数据时
- 该Follower会将该消息发送给Leader
- 再由Leader将该消息同步到其他Follower,
- 当半数以上Follower收到了该消息后,Leader回复最开始的Follower消息ack
- 该Follower再回复客户端已收到消息ack
- 之后Leader继续同步该消息直到所有Follower都收到了该消息命令
- 所有服务器都收到消息后再分别执行该消息命令
与第一种情况相比多了一步:收到消息的Follower向Leader汇报收到了客户端的命令,Leader再将该命令进行同步。
总结:消息同步由Leader负责执行
服务器动态上下线监听案例
先在集群上创建/servers 节点
1 2
| [zk: localhost:2181(CONNECTED) 10] create /servers "servers" Created /servers
|
引入Maven依赖:
1 2 3 4 5
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency>
|
服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.zhao.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private int sessionTimeout = 2000; private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer server = new DistributeServer(); server.getConnect();
server.regist(args[0]);
server.business();
}
private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); }
private void regist(String hostname) throws KeeperException, InterruptedException { String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname +" is online") ; }
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) {
} }); } }
|
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package com.zhao.case1;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;
import java.io.IOException; import java.util.ArrayList; import java.util.List;
public class DistributeClient {
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private int sessionTimeout = 2000; private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException { DistributeClient client = new DistributeClient();
client.getConnect();
client.getServerList();
client.business();
}
private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); }
private void getServerList() throws KeeperException, InterruptedException { List<String> children = zk.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data)); }
System.out.println(servers); }
private void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) {
try { getServerList(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
|
ZooKeeper 分布式锁
假设"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
原生 Zookeeper 实现分布式锁案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| package com.zhao.case2;
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;
import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private final int sessionTimeout = 2000; private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath; private String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected){ connectLatch.countDown(); }
if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){ waitLatch.countDown(); } } });
connectLatch.await();
Stat stat = zk.exists("/locks", false);
if (stat == null) { zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } }
public void zklock() { try { currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
Thread.sleep(10);
List<String> children = zk.getChildren("/locks", false);
if (children.size() == 1) { return; } else { Collections.sort(children);
String thisNode = currentMode.substring("/locks/".length()); int index = children.indexOf(thisNode);
if (index == -1) { System.out.println("数据异常"); } else if (index == 0) { return; } else { waitPath = "/locks/" + children.get(index - 1); zk.getData(waitPath,true,new Stat());
waitLatch.await();
return; } }
} catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
}
public void unZkLock() {
try { zk.delete(this.currentMode,-1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); }
} }
|
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.zhao.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributedLock lock1 = new DistributedLock();
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() { @Override public void run() { try { lock1.zklock(); System.out.println("线程1 启动,获取到锁"); Thread.sleep(5 * 1000);
lock1.unZkLock(); System.out.println("线程1 释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() {
try { lock2.zklock(); System.out.println("线程2 启动,获取到锁"); Thread.sleep(5 * 1000);
lock2.unZkLock(); System.out.println("线程2 释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
} }
|
Curator 框架实现分布式锁案例
原生的 Java API 开发存在的问题
- 会话连接是异步的,需要自己去处理。比如使用
CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建,需要自己去递归
Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。 详情请查看官方文档:https://curator.apache.org/index.html
Curator 案例实操
引入Maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
|
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| package com.zhao.case3;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
public static void main(String[] args) {
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("线程1 获取到锁");
lock1.acquire(); System.out.println("线程1 再次获取到锁");
Thread.sleep(5 * 1000);
lock1.release(); System.out.println("线程1 释放锁");
lock1.release(); System.out.println("线程1 再次释放锁");
} catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("线程2 获取到锁");
lock2.acquire(); System.out.println("线程2 再次获取到锁");
Thread.sleep(5 * 1000);
lock2.release(); System.out.println("线程2 释放锁");
lock2.release(); System.out.println("线程2 再次释放锁");
} catch (Exception e) { e.printStackTrace(); } } }).start(); }
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181") .connectionTimeoutMs(2000) .sessionTimeoutMs(2000) .retryPolicy(policy).build();
client.start();
System.out.println("zookeeper 启动成功"); return client; } }
|
ZooKeeper 常见问题
选举机制
半数机制,超过半数的投票通过,即通过。
- 第一次启动选举规则: 投票过半数时,服务器 id 大的胜出
- 第二次启动选举规则:
- EPOCH 大的直接胜出
- EPOCH 相同,事务 id 大的胜出
- 事务 id 相同,服务器 id 大的胜出
生产集群安装多少 zk 合适?
安装奇数台。 生产经验:
- 10 台服务器:3 台 zk;
- 20 台服务器:5 台 zk;
- 100 台服务器:11 台 zk;
- 200 台服务器:11 台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时
常用命令
ls、get、create、delete
ZooKeeper 原理
https://www.bilibili.com/video/BV1to4y1C7gw?p=30
Paxos 算法
Paxos算法描述
Paxos算法流程
情况1:
情况2:
情况3:
消息广播
ZAB协议针对事务请求的处理过程类似于一个两阶段提交过程 :
有可能因为Leader宕机带来数据不一致,比如
- Leader 发 起 一 个 事 务 Proposal1 后 就 宕 机 , Follower 都 没 有 Proposal1
- Leader收到半数ACK宕机, 没来得及向Follower发送Commit
怎么解决呢?ZAB引入了崩溃恢复模式。
崩溃恢复——异常假设
一旦Leader服务器出现崩溃或者由于网络原因导致Leader服务器失去了与过半 Follower的联系,那么就会进入崩溃恢复模式。
崩溃恢复——Leader 选举
崩溃恢复——数据恢复
崩溃恢复——异常提案处理
CAP 理论
ZooKeeper保证的是CP
- ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要 重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
- 进行Leader选举时集群都是不可用。
持久化
每台服务器都会在磁盘保存snapShot快照和TxnLog编辑日志,二者合起来就等于内存中的数据。其中服务器在收到命令时会更新TxnLog编辑日志的内容,过一段时间后再持久化到snapShot快照中。
服务器间同步化
ZK服务端初始化源码解析
ZK选举源码解析
ZK选举准备源码解析
ZK选举执行源码解析
Follower 和 Leader 状态同步源码解析
Follower 和 Leader 状态同步源码解析
服务端Leader启动
服务端Follower启动
客户端初始化源码解析