# zookeeper-learning
**Repository Path**: 2452860/zookeeper-learning
## Basic Information
- **Project Name**: zookeeper-learning
- **Description**: zookeeper 的学习笔记
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2021-07-15
- **Last Updated**: 2021-09-21
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
ZooKeeper
ZooKeeper是一个分布式的,开放源代码的分布式应用协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。其设计目标是将那些复杂的且容易出错的分布式⼀致性服务封装起来,构成⼀个高效可靠的原语集,并以一些简单的接口提供给用户使用,是一个典型的分布式数据一致性的解决方案,我们可以基于它实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能。
**集群**
ZooKeeper在集群管理中引入了Leader、Follower、Observer三种角色,Zookeeper集群中的所有机器通过Leader选举来选定⼀台被称为Leader的机器,Leader服务器为客户端提供读和写服务,除Leader外,其他机器包括Follower和Observer,其中Follower和Observer都能提供读服务,区别在于Observer不参与Leader选举过程,不参与写操作的过半写成功策略,因此Observer可以在不影响写性能的情况下提升集群的性能。
**session会话**
ZooKeeper客户端启动的时候,首先会与服务器建立⼀个TCP连接,从第⼀次连接建立开始,客户端会话的生命周期也开始了,通过这个连接,客户端能够通过心跳检测与服务器保持有效的会话,也能够向Zookeeper服务器发送请求并接受响应,同时还能够通过该连接接受来自服务器的Watch事件通知。
**Znode数据节点**
在ZooKeeper中,节点分为两类,第一类是指构成集群的机器,我们称之为机器节点;第二类则是指数据模型中的数据单元,我们称之为数据节点—ZNode。ZooKeeper将所有数据存储在内存中,其数据模型是一棵树(ZNode Tree),由斜杠(/)进行分割的路径,就是一个Znode,每个ZNode上都会保存自己的数据内容,同时还会保存一系列属性信息。
**版本**
Zookeeper的每个Znode上都会存储数据,对于每个ZNode,Zookeeper都会为其维护一个叫作Stat的数据结构,Stat记录了这个ZNode的三个数据版本,分别是version(当前ZNode的版本)、cversion(当前ZNode子节点的版本)、aversion(当前ZNode的ACL版本)。
**Watcher(事件监听器)**
Zookeeper允许用户在指定节点上注册⼀些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端,该机制是Zookeeper实现分布式协调服务的重要特性。
**ACL**
Zookeeper采用ACL(Access Control Lists)策略来进行权限控制,其定义了如下五种权限
- CREATE:创建子节点的权限
- READ:获取节点数据和子节点列表的权限
- WRITE:更新节点数据的权限
- DELETE:删除子节点的权限
- ADMIN:设置节点ACL的权限
## ZooKeeper环境搭建
Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式
- 单机模式:Zookeeper只运行在⼀台服务器上,适合测试环境
- 集群模式:Zookeeper运行于⼀个集群上,适合生产环境
- 伪集群模式:就是在一台服务器上运行多个Zookeeper 实例
### 单机模式搭建
1. 下载安装包
```shell
wget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
```
2. 解压安装包
```shell
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz
```
3. 在zookeeper 解压目录下创建data文件夹
```shell
cd apache-zookeeper-3.6.3-bin
mkdir data
```
4. 配置zoo.cfg
```shell
cd conf
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
```
修改zoo.cfg中的dataDir属性
dataDir= apache-zookeeper-3.6.3-bin/data
**配置路径的时候需要根据自己文件的所在路径做修改,比如/usr/local/apache-zookeeper-3.6.3-bin/data**
5. 启动zookeeper服务
```shell
cd apache-zookeeper-3.6.3-bin/bin
./zkServer.sh start
```
6. 查看服务状态
```shell
./zkServer.sh status
```
7. 停止服务
```shell
./zkServer.sh stop
```
### 伪集群模式搭建
1. 下载安装包
```shell
wget https://downloads.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
```
2. 在/user/local 目录下新建zkcluster目录
```shell
cd /usr/local
mkdir zkcluster
```
3. 将压缩包解压到/user/local/zkcluster 目录下
```shell
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz -C /user/local/zkcluster
```
4. 将解压后的包重命名为zookeeper01
```shell
cd /user/local/zkcluster
mv apache-zookeeper-3.6.3-bin zookeeper01
```
5. 将 zookeeper01 复制为zookeeper02和zookeeper03
```shell
cp -r zookeeper01 zookeeper02
cp -r zookeeper01 zookeeper03
```
6. 分别在zookeeper01、zookeeper02、zookeeper03目录下创建data及logs目录
```shell
cd /user/local/zkcluster/zookeeper01
mkdir data
cd data
mkdir logs
cd /user/local/zkcluster/zookeeper02
mkdir data
cd data
mkdir logs
cd /user/local/zkcluster/zookeeper03
mkdir data
cd data
mkdir logs
```
7. 修改zookeeper01、zookeeper02、zookeeper03 配置文件名称
```shell
cd /user/local/zkcluster/zookeeper01/conf
mv zoo_sample.cfg zoo.cfg
cd /user/local/zkcluster/zookeeper02/conf
mv zoo_sample.cfg zoo.cfg
cd /user/local/zkcluster/zookeeper03/conf
mv zoo_sample.cfg zoo.cfg
```
8. 修改zookeeper01、zookeeper02、zookeeper03 配置文件内容
- zookeeper01/conf/zoo.cfg
```shell
clientPort=2181 #zookepper服务节点1端口号
dataDir=/user/local/zkcluster/zookeeper01/data
dataLogDir=/user/local/zkcluster/zookeeper01/data/logs
```
- zookeeper02/conf/zoo.cfg
```shell
clientPort=2182 #zookepper服务节点2端口号
dataDir=/user/local/zkcluster/zookeeper02/data
dataLogDir=/user/local/zkcluster/zookeeper02/data/logs
```
- zookeeper03/conf/zoo.cfg
```shell
clientPort=2183 #zookepper服务节点3端口号
dataDir=/user/local/zkcluster/zookeeper03/data
dataLogDir=/user/local/zkcluster/zookeeper03/data/logs
```
9. 分别在zookeeper01、zookeeper02、zookeeper03 data目录下创建myid文件,内容分别为1、2、3
```shell
cd /user/local/zkcluster/zookeeper01/data
touch myid
vim myid #写入内容1 保存退出
cd /user/local/zkcluster/zookeeper02/data
touch myid
vim myid #写入内容2 保存退出
cd /user/local/zkcluster/zookeeper03/data
touch myid
vim myid #写入内容3 保存退出
```
10. 在zookeeper01、zookeeper02、zookeeper03 配置文件中分别配置集群信息
```shell
server.1=10.211.55.4:2881:3881
server.2=10.211.55.4:2882:3882
server.3=10.211.55.4:2883:3883
#server.服务器ID=服务器IP地址:服务器之间通信端⼝:服务器之间投票选举端⼝
```
11. 启动 zookeeper01、zookeeper02、zookeeper03 服务
```shell
cd /user/local/zkcluster/zookeeper01/bin
./zkServer.sh start
cd /user/local/zkcluster/zookeeper02/bin
./zkServer.sh start
cd /user/local/zkcluster/zookeeper03/bin
./zkServer.sh start
```
## ZooKeeper基本应用
### ZooKeeper的系统模型
#### 数据模型Znode
在ZooKeeper中,数据保存到一个个数据节点上,这些数据节点统一被称为Znode节点。Znode是ZooKeeper中最小的数据单元,在Znode下面还可以在挂载Znode,从而形成了具有层次化的一棵树,称之为 ZNode Tree,它采用了类似文件系统的层级树状结构进行管理,如下图

**Znode的类型**
Zookeeper 节点类型可以分为三大类
- 持久性节点(Persistent)
- 临时性节点(Ephemeral)
- 顺序性节点(Sequential)
**在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点**
**持久节点**
> 是ZooKeeper中最常见的一种节点,所谓持久节点,就是指节点被创建后会⼀直存在服务器,直到调用删除操作主动清除
**持久顺序节点**
> 有顺序的持久节点,其特性和持久节点一样,只是在创建节点的时候会在节点名后面加上一个数字后缀来表示其顺序。
**临时节点**
>会被自动清理的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点。
**临时顺序节点**
> 有顺序的临时节点,其特性和临时节点一致,只是在创建的时候会在节点名后面加上一个数字后缀来表示其顺序。
#### 事务ID
在ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作,我们也称之为事务操作或更新操作,一般包括数据节点创建与删除、数据节点内容更新等操作。对于每一个事务请求,ZooKeeper都会为其分配一个全局唯一的事务ID,用 **ZXID** 来表示,通常是一个 64 位的数字。每一个 ZXID 对应一次更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作请求的全局顺序。
#### Znode的状态信息
> Znode节点内容包括两部分即节点数据内容和节点状态信息

整个Znode节点包含两部分:节点数据内容和节点状态信息,图中[rpc,zookeeper]是数据内容,其他的属于节点状态信息。
cZxid: 表示Create ZXID 即节点被创建时的事务ID
ctime: 表示Create Time 即节点被创建时的时间
mZxid: 表示Modified ZXID 即节点最后一次被修改的时间
mtime: 表示Modified Time 即节点最后一次被修改的时间
pZxid: 表示该节点的子节点列表最后一次被修改时的事务ID。只有子节点列表变更才会更新 pZxid,
子节点内容变更不会更新
cversion: 表示子节点的版本号
dataVersion:表示内容版本号
aclVersion: 表示acl版本号
ephemeralOwner :表示创建该临时节点时的会话sessionID,如果是持久性节点那么值为0
dataLength : 表示数据长度
numChildren :表示直系子节点数
#### Watcher 数据变更通知
ZooKeeper使用Watcher机制实现分布式数据的发布/订阅功能,ZooKeeper 允许客户端向服务
端注册⼀个 Watcher 监听,当服务端的⼀些指定事件触发了这个 Watcher,那么就会向指定客户端发
送⼀个事件通知来实现分布式的通知功能 。

ZooKeeper的Watcher机制主要包括客户端线程、客户端WatchManager、ZooKeeper服务器三部分,客户端向ZooKeeper服务端注册的同时会将Watcher对象存储在客户端的WatchManager中,当ZooKeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑 。
#### ACL 保障数据安全
Zookeeper作为一个分布式协调框架,其内部存储了分布式系统运行时状态的元数据,这些元数据会直接影响基于Zookeeper进行构造的分布式系统的运行状态,因此,如何保障系统中数据的安全,从而避免因误操作所带来的数据随意变更而导致的数据库异常十分重要,在Zookeeper中,提供了一套完善的ACL(Access Control List)权限控制机制来保障数据的安全 。
- 权限模式: Scheme
权限模式用来确定权限验证过程中使用的检验策略
- IP
通过IP地址粒度来进行权限控制并支持按照网段方式式进行配置 ,如192.168.1.11或192.168.1.0/24
- Digest
使用“username:password”的形式对权限进行配置,是最常用的权限控制模式
- World
数据节点的访问权限对所有用户开放,所有用户都可以在不进行任何权限校验的情况下操作ZooKeeper上的数据,可以看作是⼀种特殊的Digest模式,它只有⼀个权限标识,即“world:
anyone”
- Super
超级用户权限,也是⼀种特殊的Digest模式。在Super模式下,超级用户可以对任意ZooKeeper上的数据节点进行任意操作
- 授权对象: ID
授权对象指的是权限赋予的用户或⼀个指定实体 ,例如 IP 地址或是机器等。在不同的权限模式下,授
权对象是不同的,表中列出了各个权限模式和授权对象之间的对应关系 。
| 权限模式 | 授权对象 |
| -------- | ------------------------------------------------------------ |
| IP | 通常是⼀个IP地址或IP段:例如: 192.168.10.110 或192.168.10.1/24 |
| Digest | 自定义,通常是username:BASE64(SHA-1(username:password)) |
| World | 只有⼀个ID : anyone |
| Super | 超级用户 |
- 权限
在ZooKeeper中,所有对数据的操作权限分为以下五大类:
- CREATE 数据节点的创建权限,允许授权对象在该数据节点下创建子节点
- DELETE 子节点的删除权限,允许授权对象删除该数据节点的子节点
- READ 数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表等
- WRITE 数据节点的更新权限,允许授权对象对该数据节点进行更新操作
- ADMIN 数据节点的管理权限,允许授权对象对该数据节点进行 ACL 相关的设置操作
### ZooKeeper的命令行操作
1. 通过zkClient 登录ZooKeeper
```shell
./zkCli.sh #本地连接
./zkCli.sh -server ip:port #连接指定的服务器
```
2. 创建节点
```shell
create [-s][-e] path data acl
#其中-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点; acl⽤来进⾏权限控制
#例如
create -s /zk-test 123 #创建了zk-test顺序节点
create -e /zk-temp 123 #创建了zk-temp临时节点
create /zk-permanent 123 #创建了zk-permanent永久节点
```
3. 读取节点
```shell
#ls命令可以列出Zookeeper指定节点下的所有子节点,但只能查看指定节点下的第一级的所有子节点
ls path #path表示的是指定数据节点的节点路径
#get命令可以获取Zookeeper指定节点的数据内容和属性信息
get path
```
4. 更新节点
```shell
set path data [version]
#data就是要更新的新内容, version表示数据版本,在zookeeper中,节点的数据是有版本概念的,这个参#数⽤于指定本次更新操作是基于Znode的哪⼀个数据版本进⾏的
#例如
set /zk-permanent 456 #将 zk-permanent 节点内容设置为456
```
5. 删除节点
```shell
delete path [version]
#若删除的节点存在子节点,那么无法删除必需先删除子节点,在删除父节点
```
### ZooKeeper的API使用
Zookeeper API共包含五个包,分别为
- org.apache.zookeeper
- org.apache.zookeeper.data
- org.apache.zookeeper.server
- org.apache.zookeeper.server.quorum
- org.apache.zookeeper.server.upgrade
```xml
org.apache.zookeeper
zookeeper
3.4.14
```
**创建会话**
> ZooKeeper 客户端和服务端会话的建立是⼀个异步的过程, 也就是说在程序中,构造方法会在处
> 理完客户端初始化⼯作后立即返回,在大多数情况下,此时并没有真正建立好⼀个可用的会话,在会话
> 的生命周期中处于“CONNECTING”的状态。 当该会话真正创建完毕后ZooKeeper服务端会向会话对应
> 的客户端发送⼀个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话。
```java
package com.h52mm.zookeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateSession implements Watcher {
//让线程等待,不让main函数结束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException {
/*
* 创建一个zk实例来连接zk服务器
* new Zookeeper(connectString,sesssionTimeOut,Wather)
* connectString: 连接地址: IP:端⼝
* sesssionTimeOut:会话超时时间:单位毫秒
* Wather:监听器(当特定事件触发监听时, zk会通过watcher通知到客户端)
*/
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
countDownLatch.await();
//会话建立成功
System.out.println("Client Connected to zookeeper");
}
//zookeeper建立连接时异步的
//当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的
//watcher通知,在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上
//的等待阻塞,⾄此,会话创建完毕
@Override
public void process(WatchedEvent watchedEvent) {
//当连接创建了,服务端发送给客户端SyncConnected事件
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
//连接已建立解除线程等待
countDownLatch.countDown();
}
}
}
```
**创建节点**
```java
package com.h52mm.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
public class CreateNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
//不让main 函数停止
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
try {
createNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void createNodeSync() throws KeeperException, InterruptedException, UnsupportedEncodingException {
/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何⼈
* AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替换。
* OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)-->world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/
//持久节点
String node_PERSISTENT = zooKeeper.create("/test_persistent", "persistent".getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//持久顺序节点
String node_PERSISTENT_SEQUENTIAL = zooKeeper.create("/test_persistent_sequential", "persistent_sequential".getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
//临时节点
String node_EPERSISTENT = zooKeeper.create("/test_ephemeral", "ephemeral".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//临时顺序节点
String node_EPERSISTENT_SEQUENTIAL = zooKeeper.create("/test_ephemeral_sequential", "ephemeral_sequential".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("创建的持久节点是:" + node_PERSISTENT);
System.out.println("创建的持久顺序节点是:" + node_PERSISTENT_SEQUENTIAL);
System.out.println("创建的临时节点是:" + node_EPERSISTENT);
System.out.println("创建的临时顺序节点是:" + node_EPERSISTENT_SEQUENTIAL);
}
}
```
**获取节点数据 **
```java
package com.h52mm.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
public class GetNoteData implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException {
zooKeeper = new ZooKeeper("10.78.230.13:2181", 5000, new GetNoteData());
//不让main 函数停止
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent watchedEvent) {
//子节点列表发生变化时,服务器会发出NodeChildrenChanged通知,但不会把变化情况告诉给客户端
// 需要客户端自行获取,且通知是一次性的,需反复注册监听
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
//获取节点数据
try {
List children = zooKeeper.getChildren(watchedEvent.getPath(), true);
System.out.println(children);
} catch (Exception e) {
e.printStackTrace();
}
}
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
try {
getNoteData();
getChildrens();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void getNoteData() throws Exception {
/**
* path : 获取数据的路径
* watch : 是否开启监听
* stat : 节点状态信息
* null: 表示获取最新版本的数据
* zk.getData(path, watch, stat);
*/
byte[] data = zooKeeper.getData("/test_persistent/test-children", true,
null);
System.out.println(new String(data, "utf-8"));
}
private static void getChildrens() throws KeeperException, InterruptedException {
/*
path:路径
watch:是否要启动监听,当子节点列表发⽣变化,会触发监听
zooKeeper.getChildren(path, watch);
*/
List children = zooKeeper.getChildren("/test_persistent", true);
System.out.println(children);
}
}
```
**修改节点数据**
```java
package com.h52mm.zookeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class UpdateNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException {
zooKeeper = new ZooKeeper("10.78.230.13:2181", 5000, new UpdateNote());
//不让main 函数停止
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
try {
updateNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void updateNodeSync() throws Exception {
/*
path:路径
data:要修改的内容 byte[]
version:为-1,表示对最新版本的数据进⾏修改
zooKeeper.setData(path, data,version);
*/
byte[] data = zooKeeper.getData("/test_persistent", false, null);
System.out.println("修改前的值:"+new String(data));
//修改 stat:状态信息对象 -1:最新版本
Stat stat = zooKeeper.setData("/test_persistent", "修改内容45678".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/test_persistent", false, null);
System.out.println("修改后的值:"+new String(data2, StandardCharsets.UTF_8));
}
}
```
**删除节点**
```java
package com.h52mm.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class DeleteNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException {
zooKeeper = new ZooKeeper("10.78.230.13:2181", 5000, new DeleteNote());
//不让main 函数停止
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
try {
deleteNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void deleteNodeSync() throws KeeperException, InterruptedException {
/*
zooKeeper.exists(path,watch) :判断节点是否存在
zookeeper.delete(path,version) : 删除节点
*/
Stat exists = zooKeeper.exists("/test_persistent/test-children", false);
System.out.println(exists == null ? "该节点不存在":"该节点存在");
zooKeeper.delete("/test_persistent/test-children",-1);
Stat exists2 = zooKeeper.exists("/test_persistent/test-children", false);
System.out.println(exists2 == null ? "该节点不存在":"该节点存在");
}
}
```
### ZkClient
> ZkClient是一款简单高效的ZooKeeper 的java客户端,在GitHub上进行了开源,它让让Zookeeper API 使用起来更简单,它能非常方便订阅各种事件并自动重新绑定事件(会话建立、节点修改、节点删除、子节点变更等)
```xml
com.101tec
zkclient
0.10
```
**创建会话**
```java
package com.h52mm.zkclient;
import org.I0Itec.zkclient.ZkClient;
public class CreateSession {
/*
创建一个zkClient实例来进行连接
注意: zkClient通过对zookeeperAPI内部包装,将这个异步的会话创建过程同步化了
*/
public static void main(String[] args) {
ZkClient zkClient=new ZkClient("10.78.230.13:2181");
System.out.println("ZooKeeper session established");
}
}
```
**创建节点**
```java
package com.h52mm.zkclient;
import org.I0Itec.zkclient.ZkClient;
public class CreateNodeSample {
public static void main(String[] args) {
ZkClient zkClient=new ZkClient("10.78.230.13:2181");
System.out.println("ZooKeeper session established");
//创建持久节点
//createParents的值设置为true,可以递归创建节点
zkClient.createPersistent("/zkclient/test_parents",true);
System.out.println("success create znode");
}
}
```
**删除节点**
```java
package com.h52mm.zkclient;
import org.I0Itec.zkclient.ZkClient;
public class DelDataSample {
public static void main(String[] args) {
ZkClient zkClient=new ZkClient("10.78.230.13:2181");
System.out.println("ZooKeeper session established");
//删除
zkClient.deleteRecursive("/zkclient/test_parents");
System.out.println("success delete znode");
}
}
```
**获取子节点**
```java
package com.h52mm.zkclient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class GetChildrenSample {
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient=new ZkClient("10.78.230.13:2181");
System.out.println("ZooKeeper session established");
List children = zkClient.getChildren("/zkclient");
System.out.println(children);
//注册监听时间
zkClient.subscribeChildChanges("/zkclient", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List currentChilds) throws Exception {
System.out.println(parentPath + " ' child changed, currentChilds:" + currentChilds);
}
});
zkClient.createPersistent("/zkclient/test_zk");
Thread.sleep(1000);
zkClient.delete("/zkclient/test_zk");
Thread.sleep(1000);
zkClient.delete("/zkclient");
Thread.sleep(Integer.MAX_VALUE);
}
}
```
**获取数据**
```java
package com.h52mm.zkclient;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class GetDataSample {
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient=new ZkClient("10.78.230.13:2181");
System.out.println("ZooKeeper session established");
String path="/zkclient";
//判断节点是否存在
boolean exists = zkClient.exists(path);
if(!exists){
//不存在创建节点
//创建临时节点
zkClient.createEphemeral(path,"123");
}
//注册监听
zkClient.subscribeDataChanges(path, new IZkDataListener() {
//节点数据变更
@Override
public void handleDataChange(String path, Object data) throws Exception {
System.out.println(path+"该节点内容被更新,更新后的内容"+data);
}
//节点删除
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println(path+" 该节点被删除");
}
});
//获取节点内容
Object o = zkClient.readData(path);
System.out.println(o);
//更新
zkClient.writeData(path,"4567");
Thread.sleep(1000);
//删除
zkClient.delete(path);
Thread.sleep(1000);
}
}
```
### Curator
> curator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样, Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等,是最流行的Zookeeper客户端之⼀。
```xml
org.apache.curator
curator-framework
2.12.0
```
**创建会话**
1. 使用CuratorFramework这个工厂类的两个静态方法来创建⼀个客户端
```java
public static CuratorFramework newClient(String connectString, RetryPolicy
retryPolicy)
public static CuratorFramework newClient(String connectString, int
sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
```
其中参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略,默认提供了以下实现,分别为ExponentialBackoffRetry(基于backoff的重连策略)、 RetryNTimes(重连N次策略)、RetryForever(永远重试策略)
2. 通过调用CuratorFramework中的start()方法来启动会话
```java
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
.connectString("server1:2181,server2:2181,server3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.build();
client.start();
```
参数:
- connectString: zk的server地址,多个server之间使用英文逗号分隔开
- connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
- sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
- retryPolicy:失败重试策略
- ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int
baseSleepTimeMs, int maxRetries, int maxSleepMs)
- baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间
- 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1,
random.nextInt(1<<(retryCount+1)))
- maxRetries:最大重试次数
- maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒
- 其他,查看org.apache.curator.RetryPolicy接口的实现类
- start():完成会话的创建
```java
package com.h52mm.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateCuratorSessionSample {
public static void main(String[] args) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.78.230.13:2181", 5000, 3000, retryPolicy);
client.start();
System.out.println("Zookeeper session1 established");
CuratorFramework client1 = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") //server地址
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(3000) // 连接超时时间
.retryPolicy(retryPolicy) // 重试策略
.namespace("base") // 独⽴命名空间/base
.build(); //
client1.start();
System.out.println("Zookeeper session2 established");
}
}
```
**session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离 **
**创建节点**
- 创建⼀个初始内容为空的节点
```java
client.create().forPath(path);
```
- 创建一个包含内容的节点
```java
client.create().forPath(path,"内容".getBytes());
```
- 递归创建夫节点,并选择节点类型
```java
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPa
th(path);
```
```java
package com.h52mm.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CreateCuratorSample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.78.230.13:2181,10.78.230.13:2182,10.78.230.13:2183") //server地址
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(3000) // 连接超时时间
.retryPolicy(retryPolicy) // 重试策略
.namespace("curator") // 独⽴命名空间/base
.build(); //
client.start();
System.out.println("Zookeeper session established");
//添加节点
String path = "/test-curator/c1";
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
Thread.sleep(1000);
System.out.println("success create znode"+path);
}
}
```
**删除节点**
1. 删除⼀个子节点
```java
client.delete().forPath(path);
```
2. 删除节点并递归删除其子节点
```java
client.delete().deletingChildrenIfNeeded().forPath(path);
```
3. 指定版本进行删除
```java
client.delete().withVersion(1).forPath(path);
```
4. 强制保证删除一个节点
```java
client.delete().guaranteed().forPath(path);
```
```java
package com.h52mm.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class DelCuratorSample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.78.230.13:2182") //server地址
.sessionTimeoutMs(50000) // 会话超时时间
.connectionTimeoutMs(30000) // 连接超时时间
.retryPolicy(retryPolicy) // 重试策略
.namespace("curator") // 独⽴命名空间/base
.build(); //
client.start();
System.out.println("Zookeeper session established");
String path="/test_del";
client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
System.out.println("success create znode"+path);
}
}
```
**获取数据**
```java
// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
```
```java
package com.h52mm.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class GetCuratorNodeSample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.78.230.13:2182") //server地址
.sessionTimeoutMs(50000) // 会话超时时间
.connectionTimeoutMs(30000) // 连接超时时间
.retryPolicy(retryPolicy) // 重试策略
.namespace("curator") // 独⽴命名空间/base
.build(); //
client.start();
System.out.println("Zookeeper session established");
//添加节点
String path = "/test_get";
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
System.out.println("success create znode"+path);
//获取节点数据
Stat stat = new Stat();
byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
System.out.println(new String(bytes));
}
}
```
**更新数据**
```java
// 普通更新
client.setData().forPath(path,"新内容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);
```
```java
package com.h52mm.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
public class SetCuratorNodeSample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.78.230.13:2182") //server地址
.sessionTimeoutMs(50000) // 会话超时时间
.connectionTimeoutMs(300000) // 连接超时时间
.retryPolicy(retryPolicy) // 重试策略
.namespace("curator") // 独⽴命名空间/base
.build(); //
client.start();
System.out.println("Zookeeper session established");
//获取节点数据
String path = "/test_get";
Stat stat = new Stat();
byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
System.out.println(new String(bytes));
//更新节点数据
int version = client.setData().withVersion(stat.getVersion()).forPath(path,"新内容".getBytes(StandardCharsets.UTF_8)).getVersion();
System.out.println("Success set node for : " + path + ", new version: "+version);
client.setData().withVersion(stat.getVersion()).forPath(path,"新内容1".getBytes(StandardCharsets.UTF_8)).getVersion();
}
}
```
**第二个更新数据,因为版本号不一致,会抛出异常Exception in thread "main" org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /curator/test_get**
## ZooKeeper的应用场景
> ZooKeeper是一个典型的发布/订阅模式的分布式数据管理与协调框架,我们可以使用它来进行分布式
> 数据的发布与订阅。另⼀方面,通过对ZooKeeper中丰富的数据节点类型进行交叉使用,配合Watcher事件通知机制,可以非常方便地构建⼀系列分布式应用中都会涉及的核心功能,如数据发布/订阅、命名服务、集群管理、 Master选举、分布式锁和分布式队列等。
### 数据发布/订阅
数据发布/订阅(Publish/Subscribe)系统 ,即所谓的配置中心,就是发布者将数据发布到ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新 。
ZooKeeper客户端向ZooKeeper服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息通知之后,需要主动到服务端获取最新的数据。
如果将配置信息存放到ZooKeeper上进行集中管理,那么通常情况下,应用在启动的时候都会主动到ZooKeeper服务端上进行⼀次配置信息的获取,同时,在指定节点上注册⼀个Watcher监听,这样⼀来,但凡配置信息发生变更,服务端都会实时通知到所有订阅的客户端,从而达到实时获取最新配置信息的目的。
### 命名服务
命名服务(Name Service)也是分布式系统中比较常见的⼀类场景,是分布式系统最基本的公共服务之⼀。在分布式系统中,被命名的实体通常是集群中的机器、提供的服务地址或远程对象等,例如⼀些分布式服务框架(如RPC、 RMI)中的服务地址列表,通过使用命名服务,客户端应用能够根据指定名字来获取资源的实体、服务地址和提供者的基本信息等。
### 集群管理
集群管理,包括集群监控与集群控制两大块,前者侧重对集群运行时状态的收集,后者则是对集群进行操作与控制 。
### Master选举
在分布式系统中, Master往往用来协调集群中其他系统单元,具有对分布式系统状态变更的决定权 。例如,在⼀些读写分离的应用场景中,客户端的写请求往往是由 Master来处理的;而在另⼀些场景中,Master则常常负责处理⼀些复杂的逻辑,并将处理结果同步给集群中其他系统单元 。
### 分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的⼀种方式 ,如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要通过一些互斥手段来防止彼此之间的干扰,以保证数据一致性,在这种情况下往往就需要使用分布式锁。
**排他锁**
排他锁(Exclusive Locks,简称 X 锁),又称为写锁或独占锁,是一种基本的锁类型。如果事务 T1对数据对象 O1加上了排他锁,那么在整个加锁期间,只允许事务 T1对 O1进行读取和更新操作,其他任何事务都不能再对这个数据对象进行任何类型的操作——直到T1释放了排他锁,例如数据库中的行锁。
ZooKeeper实现排他锁
1. 定义锁
通过 ZooKeeper上的数据节点来定义一个锁
2. 获取锁
在需要获取排他锁时,所有的客户端都会试图通过调用 create()接口,在节点下创建临时子节点ZooKeeper 会保证在所有的客户端中,最终只有一个客户端能够创建成功,那么就可以认为该客户端获取了锁,同时,所有没有获取到锁的客户端就需要到该节点上注册一个子节点变更的Watcher监听,以便实时监听到lock节点的变更情况 。
3. 释放锁
因为创建的是临时节点,当获取锁的客户端宕机或者是主动删除节点就会释放该锁,释放锁后ZooKeeper会通知所有在该节点上注册了子节点变更Watcher监听的客户端。这些客户端在接收到通知后,再次重新发起分布式锁获取。
**共享锁**
共享锁(Shared Locks,简称S锁),又称为读锁,如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁——直到该数据对象上的所有共享锁都被释放 ,和排他锁最根本的区别是,加上排他锁后,数据对象只对一个事务可见,而加上共享锁后,数据对象对所有事务可见。
ZooKeeper实现共享锁
1. 定义锁
和排他锁一样,通过 ZooKeeper上的数据节点来定义一个锁 (临时顺序节点)
2. 获取锁
在需要获取共享锁时,所有客户端都会到节点下面创建⼀个临时顺序节点,如果当前是读请求,那么就创建带有-R的临时节点,例如/test/test-R-00000001;如果是写请求,那么就创建带有-W的临时节点例如/test/test-W-00000002。
判断读写顺序
1. 创建完节点后,获取该节点下所有子节点,并对该节点变更注册监听
2. 确定自己的节点序号在所有子节点中的顺序
3. 对于读请求:若没有比自己序号小的子节点或所有比自己序号小的子节点都是读请求,那么表
明自己已经成功获取到共享锁,同时开始执行读取逻辑,若有写请求,则需要等待。对于写请求:若自己不是序号最小的子节点,那么需要等待。
4. 接收到Watcher通知后,重复步骤1
3. 释放锁
其释放锁的流程与排他锁⼀致
**羊群效应**
羊群效应就是一个特定的Znode 改变的时候ZooKeper 触发了所有watches 的事件,比如上面的共享锁实现过程,当集群中的机器非常多的时候,一个子节点的操作就会触发大量的watches事件,大量的Watchert通知和子节点列表获取两个操作会重复运行,这样不仅会对zookeeper服务器造成巨大的性能影响影响和网络开销,更为严重的是,如果同⼀时间有多个节点对应的客户端完成事务或是事务中断引起节点消失, ZooKeeper服务器就会在短时间内向其余客户端发送大量的事件通知。
为了避免羊群效应,我们可以对共享锁的实现步骤做下改进,即每个锁竞争者只关注比自己小的那个节点是否存在即可。
1. 客户端调用create接口创建临时顺序节点
2. 客户端调用getChildren接口获取所有已经创建的子节点列表(不注册任何Watcher)
3. 如果无法获取共享锁,就调用exist接口来对比自己小的节点注册Watcher。对于读请求:向比自己序号小的最后⼀个写请求节点注册Watcher监听。对于写请求:向比自己序号小的最后一个节点注册Watcher监听
4. 等待Watcher通知 ,继续进入步骤2
### 分布式队列
分布式队列可以简单分为FIFO先入先出队列和等待队列元素聚集后统一安排处理执行的Barrier模型
- FIFO先入先出队列
使用ZooKeeper实现FIFO队列
1. 所有客户端都会向ZooKeeper某节点下创建一个临时顺序节点
2. 调用getChildren接口来获取该节点下的所有子节点,从而获取队列中所有的元素
3. 确定自己的节点序号在所有子节点中的顺序
4. 如果自己的序号不是最小,那么需要等待,同时向比自己序号小的最后⼀个节点注册Watcher监听
5. 接收到Watcher通知后,重复步骤1
- Barrier
Barrier原意是指障碍物、屏障,而在分布式系统中,特指系统之间的一个协调条件,规定了一个队列的
元素必须都集聚后才能统一进行安排,否则一直等待。 其实是在FIFO队列的基础上进行了增强。
使用ZooKeeper实现Barrier
1. 在创建父节点的时候,内容写入一个数字n代表Barrier值
2. 所有的客户端都会到该父节点下创建一个临时节点
3. 节点创建完毕后,客户端调用getData接口获取父节点的数据内容
4. 调用getChildren接口获取父节点下的所有子节点,同时注册对子节点变更的Watcher监听
5. 统计子节点的个数
6. 如果子节点个数还不足Barrier个,那么需要等待
7. 接受到Wacher通知后,重复步骤2