zookeeper应用相关
common-service:公共的服务接口以及其他的公用类
consumer-demo:消费者demo
provider-service1-impl: 提供者服务1 实现服务接口
provider-service2-impl: 提供者服务2 实现服务接口
zkconfig-demo: zk配置中心实现demo
Zookeeper是⼀个开源的分布式协调服务,其设计⽬标是将那些复杂的且容易出错的分布式⼀致性服务封装起来,构成⼀个⾼效可靠的原语集,并以⼀些简单的接⼝提供给⽤户使⽤。zookeeper是⼀个典型的分布式数据⼀致性的解决⽅案,分布式应⽤程序可以基于它实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能
通常在分布式系统中,构成⼀个集群的每⼀台机器都有⾃⼰的⻆⾊,最典型的集群就是Master/Slave模式(主备模式),此情况下把所有能够处理写操作的机器称为Master机器,把所有通过异步复制⽅式获取最新数据,并提供读服务的机器为Slave机器。
⽽在Zookeeper中,这些概念被颠覆了。它没有沿⽤传递的Master/Slave概念,⽽是引⼊了Leader、Follower、Observer三种⻆⾊。Zookeeper集群中的所有机器通过Leader选举来选定⼀台被称为Leader的机器,Leader服务器为客户端提供读和写服务,除Leader外,其他机器包括Follower和Observer,Follower和Observer都能提供读服务,唯⼀的区别在于Observer不参与Leader选举过程,不参与写操作的过半写成功策略,因此Observer可以在不影响写性能的情况下提升集群的性能。
Session指客户端会话,⼀个客户端连接是指客户端和服务端之间的⼀个TCP⻓连接,Zookeeper对外的服务端⼝默认为2181,客户端启动的时候,⾸先会与服务器建⽴⼀个TCP连接,从第⼀次连接建⽴开始,客户端会话的⽣命周期也开始了,通过这个连接,客户端能够⼼跳检测与服务器保持有效的会话,也能够向Zookeeper服务器发送请求并接受响应,同时还能够通过该连接接受来⾃服务器的Watch事件通知。
在谈到分布式的时候,我们通常说的“节点”是指组成集群的每⼀台机器。然⽽,在ZooKeeper中,“节点”分为两类,第⼀类同样是指构成集群的机器,我们称之为机器节点;第⼆类则是指数据模型中的数据单元,我们称之为数据节点——ZNode。ZooKeeper将所有数据存储在内存中,数据模型是⼀棵树(ZNode Tree),由斜杠(/)进⾏分割的路径,就是⼀个Znode,例如/app/path1。每个ZNode上都会保存⾃⼰的数据内容,同时还会保存⼀系列属性信息。
Zookeeper 节点类型可以分为三⼤类:
在开发中在创建节点的时候通过组合可以⽣成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。不同类型的节点则会有不同的⽣命周期
持久节点:是Zookeeper中最常⻅的⼀种节点类型,所谓持久节点,就是指节点被创建后会⼀直存在服务器,直到删除操作主动清除
持久顺序节点:就是有顺序的持久节点,节点特性和持久节点是⼀样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后⾯加上⼀个数字后缀,来表示其顺序。
临时节点:就是会被⾃动清理掉的节点,它的⽣命周期和客户端会话绑在⼀起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建⼦节点。
临时顺序节点:就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后⾯加上数字后缀
4、版本
Zookeeper的每个Znode上都会存储数据,对于每个ZNode,Zookeeper都会为其维护⼀个叫作Stat的数据结构,Stat记录了这个ZNode的三个数据版本,分别是version(当前ZNode的版本)、cversion(当前ZNode⼦节点的版本)、aversion(当前ZNode的ACL版本)。
Wathcer(事件监听器),是Zookeeper中⼀个很重要的特性,Zookeeper允许⽤户在指定节点上注册⼀些Watcher,并且在⼀些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端,该机制是Zookeeper实现分布式协调服务的重要特性
Zookeeper使⽤Watcher机制实现分布式数据的发布/订阅功能
Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部分。
具体⼯作流程为:客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatcherManager当中。当Zookeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象来执⾏回调逻辑。
Zookeeper作为⼀个分布式协调框架,其内部存储了分布式系统运⾏时状态的元数据,这些元数据会直接影响基于Zookeeper进⾏构造的分布式系统的运⾏状态,因此,如何保障系统中数据的安全,从⽽避免因误操作所带来的数据随意变更⽽导致的数据库异常⼗分重要,在Zookeeper中,提供了⼀套完善的ACL(Access Control List)权限控制机制来保障数据的安全。 我们可以从三个⽅⾯来理解ACL机制:权限模式(Scheme)、授权对象(ID)、权限Permission),通常使⽤"scheme: id : permission"来标识⼀个有效的ACL信息。
权限模式(Scheme) ⽤来确定权限验证过程中使⽤的检验策略,有如下四种模式:
授权对象(ID)
授权对象指的是权限赋予的⽤户或⼀个指定实体,例如 IP 地址或是机器等。在不同的权限模式下,授权对象是不同的
权限模式 | 授权对象 |
---|---|
IP | 通常是⼀个IP地址或IP段:例如:192.168.10.110 或192.168.10.1/24 |
Digest | ⾃定义,通常是username:BASE64(SHA-1(username:password))例如:zm:sdfndsllndlksfn7c= |
Digest | 只有⼀个ID :anyone |
Super | 超级⽤户 |
权限Permission有如下五种 :
其中需要注意的是,CREATE和DELETE这两种权限都是针对⼦节点的权限控制
Zookeeper安装⽅式有三种:
zookeeper安装以windows环境为例:
dataDir=G:\LGStudy\CTools\Zookeeper\zookeeper1\data
dataLogDir=G:\LGStudy\CTools\Zookeeper\zookeeper1\logs
Zookeeper不但可以在单机上运⾏单机模式Zookeeper,⽽且可以在单机模拟集群模式 Zookeeper的运⾏,也就是将不同实例运⾏在同⼀台机器,⽤端⼝进⾏区分,伪集群模式为我们体验Zookeeper和做⼀些尝试性的实验提供了很⼤的便利。⽐如,我们在测试的时候,可以先使⽤少量数据在伪集群模式下进⾏测试。当测试可⾏的时候,再将数据移植到集群模式进⾏真实的数据实验。这样不但保证了它的可⾏性,同时⼤⼤提⾼了实验的效率。这种搭建⽅式,⽐较简便,成本⽐较低,适合测试和学习
注意事项: ⼀台机器上部署了3个server,也就是说单台机器及上运⾏多个Zookeeper实例。这种情况下,必须保证每个配置⽂档的各个端⼝号不能冲突,除clientPort不同之外,dataDir也不同。另外,还要在dataDir所对应的⽬录中创建myid⽂件来指定对应的Zookeeper服务器实例
如果在1台机器上部署多个server,那么每台机器都要不同的 clientPort,⽐如 server1是2181,server2是2182,server3是2183
dataDir和dataLogDir也需要区分下,将数据⽂件和⽇志⽂件分开存放,同时每个server的这两变量所对应的路径都是不同的
server.X 这个数字就是对应,data/myid中的数字。在3个server的myid⽂件中分别写⼊了1,2,3,那么每个server中的zoo.cfg都配 server.1 server.2,server.3就⾏了。因为在同⼀台机器上,后⾯连着的2个端⼝,3个server都不要⼀样,否则端⼝冲突
1、在单机部署的基础上复制zookeeper2和zookeeper3
2、在zookeeper1、2、3的 data ⽬录下创建⼀个 myid ⽂件,内容分别是1、2、3,这个⽂件就是记录每个服务器的ID
3、分别修改zookeeper1、2、3的zoo.cfg
#server.服务器ID=服务器IP地址:服务器之间通信端⼝:服务器之间投票选举端⼝
通过zkClient进⼊zookeeper客户端命令⾏
./zkcli.sh 连接本地的zookeeper服务器
./zkCli.sh -server ip:port 连接指定的服务器
创建节点
create [-s][-e] path data acl
其中,-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;acl⽤来进⾏权限控制。
读取节点 与读取相关的命令有ls 命令和get 命令
ls命令可以列出Zookeeper指定节点下的所有⼦节点,但只能查看指定节点下的第⼀级的所有⼦节点;
ls path
其中,path表示的是指定数据节点的节点路径
get命令可以获取Zookeeper指定节点的数据内容和属性信息
get path
更新节点
set path data [version]
删除节点
delete path [version]
若删除节点存在⼦节点,那么⽆法删除该节点,必须先删除⼦节点,再删除⽗节点
Zookeeper作为⼀个分布式框架,主要⽤来解决分布式⼀致性问题,它提供了简单的分布式原语,并且对多种编程语⾔提供了API,所以接下来重点来看下Zookeeper的java客户端API使⽤⽅式
Zookeeper API共包含五个包,分别为:
其中org.apache.zookeeper,包含Zookeeper类,他是我们编程时最常⽤的类⽂件。这个类是Zookeeper客户端的主要类⽂件。如果要使⽤Zookeeper服务,应⽤程序⾸先必须创建⼀个Zookeeper实例,这时就需要使⽤此类。⼀旦客户端和Zookeeper服务端建⽴起了连接,Zookeeper系统将会给本次连接会话分配⼀个ID值,并且客户端将会周期性的向服务器端发送⼼跳来维持会话连接。只要连接有效,客户端就可以使⽤Zookeeper API来做相应处理了。
所需依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
使用示例:Zookeeper API示例代码
package com.ozdemo.zkdemo.zkApi;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.List;
/**
* @Description: ZkApi管理类
* @Author: Created by OrangeZh
* @Date: Created in 2020/10/20 18:25
*/
public class ZkTestManager {
private ZkTestManager() {
}
private static ZkTestManager instance;
public static ZkTestManager getInstance() {
if (instance == null) {
instance = new ZkTestManager();
}
return instance;
}
/**
* 创建连接
* <p>
* 客户端可以通过创建一个zk实例来连接zk服务器
* new Zookeeper(connectString,sessionTimeOut,Wather)
* connectString: 连接地址:IP:端口
* sessionTimeOut:会话超时时间:单位毫秒
* Watcher:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
public ZooKeeper createSession() {
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkTest());
System.out.println(zooKeeper.getState());
} catch (Exception e) {
e.printStackTrace();
}
return zooKeeper;
}
/**
* 创建节点的方法
* <p>
* path:节点创建的路径
* data[]:节点创建要保存的数据,是个byte类型的
* acl:节点创建的权限信息(4种类型)
* * * * * * ANYONE_ID_UNSAFE : 表示任何人
* * * * * * AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
* * * * * * OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
* * * * * * CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode:创建节点的类型(4种类型)
* * * * * * PERSISTENT:持久节点
* * * * * * PERSISTENT_SEQUENTIAL:持久顺序节点
* * * * * * EPHEMERAL:临时节点
* * * * * * EPHEMERAL_SEQUENTIAL:临时顺序节点
* String node = zookeeper.create(path,data,acl,createMode);
*/
public String createNoteSync(ZooKeeper zooKeeper, String path, String data, List<ACL> acl, CreateMode createMode) {
String note = "";
try {
note = zooKeeper.create(path, data.getBytes(), acl, createMode);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return note;
}
/**
* 修改节点
* <p>
* path:路径
* data:要修改的内容 byte[]
* version:为-1,表示对最新版本的数据进行修改
* zooKeeper.setData(path, data,version);
*
* @param zooKeeper
*/
public Stat updateNoteSync(ZooKeeper zooKeeper, String path, String data) {
Stat stat = null;
try {
stat = zooKeeper.setData(path, data.getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
}
return stat;
}
/**
* 获取某个节点的内容
* <p>
* path: 获取数据的路径
* watch: 是否开启监听
* stat: 节点状态信息
* null: 表示获取最新版本的数据
* zk.getData(path, watch, stat);
*
* @param zooKeeper
*/
public String getNoteData(ZooKeeper zooKeeper, String path) {
String res = "";
try {
byte[] data = zooKeeper.getData(path, true, null);
res = new String(data);
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
/**
* 获取某个节点的子节点列表
* <p>
* path:路径
* watch:是否要启动监听,当子节点列表发生变化,会触发监听
* zooKeeper.getChildren(path, watch);
*
* @param zooKeeper
*/
public List<String> getChildrens(ZooKeeper zooKeeper, String path) {
List<String> childrens = null;
try {
childrens = zooKeeper.getChildren(path, true);
} catch (Exception e) {
e.printStackTrace();
}
return childrens;
}
/**
* 删除节点的方法
* <p>
* zookeeper.delete(path,version) : 删除节点
*
* @param zooKeeper
* @param path
*/
public void deleteNoteSync(ZooKeeper zooKeeper, String path) {
try {
Stat stat = getNoteExists(zooKeeper, path);
if (stat != null) {
zooKeeper.delete(path, -1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 节点是否存在
* <p>
* zooKeeper.exists(path,watch) :判断节点是否存在
*
* @param zooKeeper
* @param path
*/
public Stat getNoteExists(ZooKeeper zooKeeper, String path) {
Stat stat = null;
try {
stat = zooKeeper.exists(path, true);
} catch (Exception e) {
e.printStackTrace();
}
return stat;
}
}
ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原⽣API接⼝之上进⾏了包装,是⼀个更易⽤的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能
所需依赖:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
示例代码:ZkClient示例代码
package com.ozdemo.zkdemo.zkClient;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
import static java.lang.Integer.MAX_VALUE;
/**
* @Description: zkClient测试类
* @Author: Created by OrangeZh
* @Date: Created in 2020/10/21 15:23
*/
public class ZkClientTest {
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
//获取子节点列表
List<String> children = zkClient.getChildren("/");
System.out.println(children.toString());
//节点创建
//判断节点是否存在
String path = "/oz-zkClient1";
boolean exists = zkClient.exists(path);
if (!exists) {
zkClient.createPersistent(path, "oz-zkClient1-data");
Object object = zkClient.readData(path);
System.out.println("创建节点内容:" + object);
}
//注册监听
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataChange(String path, Object data) throws Exception {
System.out.println(path + " 该节点被更新,更新后的内容:" + data);
}
public void handleDataDeleted(String s) throws Exception {
System.out.println(s + " 该节点被删除");
}
});
//节点更新
zkClient.writeData(path, "update data");
//获取节点内容
Object o = zkClient.readData(path);
System.out.println("节点更新后内容:" + o);
//节点删除
zkClient.deleteRecursive(path);
exists = zkClient.exists(path);
if (!exists) {
System.out.println("删除节点:path:" + path);
}
Thread.sleep(MAX_VALUE);
}
}
curator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样,Curator解决了很多Zookeeper客户端⾮常底层的细节开发⼯作,包括连接重连,反复注册Watcher和NodeExistsException异常等,是最流⾏的Zookeeper客户端之⼀。从编码⻛格上来讲,它提供了基于Fluent的编程⻛格⽀持
所需依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
示例代码:Curator示例代码
package com.ozdemo.zkdemo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import java.util.List;
/**
* @Description: Curator测试
* @Author: Created by OrangeZh
* @Date: Created in 2020/10/22 11:44
*/
public class CuratorTest {
public static void main(String[] args) {
try {
/**
* 使用CuratorFramework工厂类的2个静态方法来创建客户端
* 其中参数RetryPolicy提供重试策略的接⼝,可以让⽤户实现⾃定义的重试策略,默认提供了以下实现,
* 分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)
*/
/*RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
client.start();*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 1000, retryPolicy);
client.start();
String pathIUserService = "/services/IUserService/providers";
List<String> childrenIUserService = client.getChildren().forPath(pathIUserService);
System.out.println(childrenIUserService);
// 删除节点
pathIUserService += "/0.0.0.0";
client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(pathIUserService);
System.out.println("删除成功,删除的节点" + pathIUserService);
// 创建节点
String path = "/oz-curator/c1";
//检测节点是否存在
Stat stat = client.checkExists().forPath(path);
if(stat == null){
String s = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
System.out.println("节点递归创建成功,该节点路径" + s);
}
// 获取节点数据内容
byte[] bytes = client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听器watchedEvent:" + watchedEvent);
}
}).forPath(path);
System.out.println("监听节点内容:" + new String(bytes));
// 获取子节点列表并监听
List<String> servicesList = client.getChildren().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println("-- 监听器watchedEvent:" + watchedEvent);
String watchedPath = watchedEvent.getPath(); //监听到的path
Event.EventType watchedEventType = watchedEvent.getType(); //监听类型
Event.KeeperState watchedKeeperState = watchedEvent.getState(); //状态
System.out.println("watchedEventType:" + watchedEventType + "-- watchedPath:" + watchedPath);
//判断连接是否成功
if (watchedKeeperState == Event.KeeperState.SyncConnected) {
//事件监听
if (null != watchedEventType) {
switch (watchedEventType) {
case NodeCreated:
System.out.println("-- 节点创建成功:-- watchedPath:" + watchedPath);
break;
case NodeDataChanged:
byte[] noteData = client.getData().forPath(watchedPath);
System.out.println("-- 节点内容变更成功:-- watchedPath:" + watchedPath + " -- 变更内容:" + String.valueOf(noteData));
break;
case NodeChildrenChanged:
List<String> servicesList1 = client.getChildren().forPath("/oz-curator");
System.out.println("-- 节点子节点变化:-- watchedPath:" + watchedPath + " -- 变更内容:" + servicesList1);
break;
case NodeDeleted:
System.out.println("-- 节点删除成功:-- watchedPath:" + watchedPath);
break;
case None:
break;
}
}
}else {
System.out.println("-- 客户端与服务端会话建立失败··· watchedKeeperState:" + watchedKeeperState);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).forPath("/oz-curator");
System.out.println("监听子节点列表:" +servicesList);
// 第一次变更节点数据
client.setData().forPath(path,"new content".getBytes());
// 第二次变更节点数据
client.setData().forPath(path,"second content".getBytes());
System.out.println("获取到的节点数据内容:" + new String(bytes));
// 创建节点
String path2 = "/oz-curator/c2";
//检测节点是否存在
Stat stat2 = client.checkExists().forPath(path2);
if(stat2 == null){
String s = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path2, "init".getBytes());
System.out.println("节点递归创建成功,该节点路径" + s);
}
// 获取节点状态信息
stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取到的节点状态信息:" + stat);
// 更新节点内容 //1
int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion();
System.out.println("当前的最新版本是:" + version);
byte[] bytes2 = client.getData().forPath(path);
System.out.println("修改后的节点数据内容:" + new String(bytes2));
// 删除节点
path = "/oz-curator/c2";
client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
System.out.println("删除成功,删除的节点" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
}
项目源码:去看源码
1、启动服务提供者时需要将服务提供者注册到zookeeper注册中心中
//服务注册 /oz-services/serviceName/ip:port
ZkServerManager.getInstance().registerProvidersByServiceName("IUserService", inetSocketAddress);
/**
* 服务注册中心 path
*/
private String servicesPath = "/services";
/**
* 服务提供者注册中心 path
*/
private String serviceProvidersPath = "/providers";
/**
* 根据服务名称注册提供者
*
* @param serviceName
*/
public void registerProvidersByServiceName(String serviceName, InetSocketAddress inetSocketAddress) {
try {
//1、创建zk客户端
zkClient = getZkClient();
//2、创建提供者临时节点 先创建主节点为持久节点,再创建子节点为临时节点
//先创建主节点为持久节点
String consumersPath = servicesPath + "/" + serviceName + serviceProvidersPath;
Stat consumersPathStat = zkClient.checkExists().forPath(consumersPath);
if (consumersPathStat == null) {
String s = zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(consumersPath, String.valueOf(System.currentTimeMillis()).getBytes());
System.out.println("主节点递归创建成功,该节点路径" + s);
}
//再创建子节点为临时节点
consumersPath += inetSocketAddress.getAddress() + ":" + inetSocketAddress.getPort();
consumersPathStat = zkClient.checkExists().forPath(consumersPath);
if (consumersPathStat == null) {
String s = zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(consumersPath, String.valueOf(System.currentTimeMillis()).getBytes());
System.out.println("子节点递归创建成功,该节点路径" + s);
}
} catch (Exception e) {
e.printStackTrace();
}
}
2、消费者启动后需要从注册中心中拉取需要的服务提供者,并缓存起来
//通过服务名缓存提供者列表,并监听
String serviceName = serviceClass.getSimpleName();
ZkClientManager.getInstance().cacheProvidersByServiceName(serviceName);
/**
* 服务注册中心 path
*/
private String servicesPath = "/services";
/**
* 服务提供者注册中心 path
*/
private String serviceProvidersPath = "/providers";
/**
* 服务消费者者注册中心 path
*/
private String serviceConsumersPath = "/consumers";
/**
* 注册中心(zk)服务器列表缓存 serviceName:providerList
*/
private Map<String, List<String>> providersMap = new HashMap<>();
public Map<String, List<String>> getProvidersMap() {
return providersMap;
}
public List<String> getProvidersByServiceName(String serviceName) {
return providersMap.get(serviceName);
}
public void setProvidersMap(Map<String, List<String>> providersMap) {
this.providersMap = providersMap;
}
/**
* 根据服务名称缓存并监听提供者
*
* @param serviceName
*/
public void cacheProvidersByServiceName(String serviceName) {
try {
//1、服务提供者列表为空就去zk拉一次 并将列表缓存起来
List<String> servicesList = providersMap.get(serviceName);
if (CollectionUtils.isEmpty(servicesList)) {
//1)、创建zk客户端
zkClient = getZkClient();
//2)、获取子节点列表并监听 path = "/services/"+ serviceName + serviceProvidersPath
String providersListPath = servicesPath + "/" + serviceName + serviceProvidersPath;
providerLWatcher(serviceName, providersListPath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服务提供者变化监听
*
* @param serviceName
* @param providersListPath
* @return
* @throws Exception
*/
private void providerLWatcher(String serviceName, String providersListPath) throws Exception {
List<String> changeServicesList = zkClient.getChildren().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println("-- 监听器watchedEvent:" + watchedEvent);
String watchedPath = watchedEvent.getPath(); //监听到的path
Event.EventType watchedEventType = watchedEvent.getType(); //监听类型
Event.KeeperState watchedKeeperState = watchedEvent.getState(); //状态
System.out.println("watchedEventType:" + watchedEventType + "-- watchedPath:" + watchedPath);
//判断连接是否成功
if (watchedKeeperState == Event.KeeperState.SyncConnected) {
//事件监听
if (null != watchedEventType) {
switch (watchedEventType) {
case NodeChildrenChanged:
providerLWatcher(serviceName, providersListPath);
break;
}
}
} else {
zkClient = null;
System.out.println("客户端与服务端会话建立失败··· watchedKeeperState:" + watchedKeeperState);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).forPath(providersListPath);
providersMap.put(serviceName, changeServicesList);
//创建Rpc客户端连接 初始化客户端client 刷新channel
RpcServiceConsumer.getInstance().createChannelByServiceName(serviceName);
System.out.println("-- 服务提供者列表变化: -- 变更内容:" + changeServicesList);
}
1、根据服务提供者列表建立与提供者的连接 每个提供者都建立一个连接(Channel)
/**
* @Description: channel连接
* @Author: Created by OrangeZh
* @Date: Created in 2020/10/28 15:06
*/
public class ChannelSession {
private String serverNode;
private ChannelFuture channelFuture;
private ClientRpcHandler clientRpcHandler;
private long lastResponseSystemTime; //上次请求的响应系统时间
private long lastResponseTime; //上次请求的响应时间
public String getServerNode() {
return serverNode;
}
public void setServerNode(String serverNode) {
this.serverNode = serverNode;
}
public ChannelFuture getChannelFuture() {
return channelFuture;
}
public void setChannelFuture(ChannelFuture channelFuture) {
this.channelFuture = channelFuture;
}
public ClientRpcHandler getClientRpcHandler() {
return clientRpcHandler;
}
public void setClientRpcHandler(ClientRpcHandler clientRpcHandler) {
this.clientRpcHandler = clientRpcHandler;
}
public long getLastResponseSystemTime() {
return lastResponseSystemTime;
}
public void setLastResponseSystemTime(long lastResponseSystemTime) {
this.lastResponseSystemTime = lastResponseSystemTime;
}
public long getLastResponseTime() {
return lastResponseTime;
}
public void setLastResponseTime(long lastResponseTime) {
this.lastResponseTime = lastResponseTime;
}
}
/**
* @Description: 客户端自定义处理类
* @Author: Created by OrangeZh
* @Date: Created in 2020/10/17 16:45
*/
public class ClientRpcHandler extends ChannelInboundHandlerAdapter implements Callable {
//1.定义成员变量
private ChannelHandlerContext context; //事件处理器上下文对象 (存储handler信息,写操作)
private RpcResponse rpcResponse; // 记录服务器返回的数据
private RpcRequest rpcRequest; //记录将要发送给服务器的数据
private String serverNode;
//2.实现channelActive 客户端和服务器连接时,该方法就自动执行
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//初始化ChannelHandlerContext
this.context = ctx;
}
//3.实现channelRead 当我们读到服务器数据,该方法自动执行
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将读到的服务器的数据msg ,设置为成员变量的值
RpcResponse rpcResponse = (RpcResponse) msg;
if (rpcResponse.getOpcodeType() != RpcOpcodeType.HeartbeatResponse) {
this.rpcResponse = rpcResponse;
}
//存储服务提供者最后一次响应时间到chanelSession
long nowTime = System.currentTimeMillis();
ChannelSession channelSession = RpcServiceConsumer.getInstance().getChannelSessionByServerNode(serverNode);
channelSession.setLastResponseSystemTime(nowTime);
channelSession.setLastResponseTime(nowTime - rpcRequest.getRequestSystemTime());
notify();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
//删除ChannelSession
RpcServiceConsumer.getInstance().removeChannelSession(serverNode);
notify();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
// 超时关闭channel
ctx.close();
//删除ChannelSession
RpcServiceConsumer.getInstance().removeChannelSession(serverNode);
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
// 发送心跳
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestSystemTime(System.currentTimeMillis());
rpcRequest.setOpcodeType(RpcOpcodeType.HeartbeatRequest);
this.rpcRequest = rpcRequest;
ctx.writeAndFlush(rpcRequest);
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println("ALL_IDLE");
}
} else {
super.userEventTriggered(ctx, evt);
}
}
//4.将客户端的数写到服务器
public synchronized Object call() throws Exception {
//context给服务器写数据
context.writeAndFlush(rpcRequest);
wait();
return rpcResponse;
}
//5.设置参数的方法
public void setRpcRequest(RpcRequest rpcRequest) {
this.rpcRequest = rpcRequest;
}
public void setServerNode(String serverNode) {
this.serverNode = serverNode;
}
}
/**
* 1.创建一个线程池对象 -- 它要处理我们自定义事件
*/
private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* channel会话缓存
*/
private ConcurrentMap<String, ChannelSession> channelSessionMap = new ConcurrentHashMap<>();
public ConcurrentMap<String, ChannelSession> getChannelSessionMap() {
return channelSessionMap;
}
public ChannelSession getChannelSessionByServerNode(String serverNode) {
return channelSessionMap.getOrDefault(serverNode, null);
}
public void setChannelSessionMap(ConcurrentMap<String, ChannelSession> channelSessionMap) {
this.channelSessionMap = channelSessionMap;
}
public void removeChannelSession(String serverNode) {
channelSessionMap.remove(serverNode);
System.out.println("删除" + serverNode + " channelSession!");
}
private EventLoopGroup workGroup;
/**
* 初始化Bootstrap
*/
private Bootstrap getBootstrap() {
//1)创建连接池对象
if (null == workGroup) {
workGroup = new NioEventLoopGroup();
}
//2)创建客户端的引导对象
Bootstrap bootstrap = new Bootstrap();
//3)配置启动引导对象
bootstrap.group(workGroup)
//设置通道为NIO
.channel(NioSocketChannel.class)
//设置请求协议为TCP
.option(ChannelOption.TCP_NODELAY, true);
return bootstrap;
}
/**
* 根据服务名称创建channel
*/
public void createChannelByServiceName(String serviceName) {
Bootstrap bootstrap = getBootstrap();
//获取当前服务名称下的服务器列表
List<String> servicesList = ZkClientManager.getInstance().getProvidersByServiceName(serviceName);
if (!CollectionUtils.isEmpty(servicesList)) {
for (String serverNode : servicesList) {
if(channelSessionMap.containsKey(serverNode)){
System.out.println("与提供者:" + serverNode + " 连接已存在!");
continue;
}
doConnect(serviceName, serverNode, bootstrap);
}
}
}
/**
* 建立连接
*
* @param serviceName
* @param serverNode
* @param bootstrap
*/
private void doConnect(String serviceName, String serverNode, Bootstrap bootstrap) {
try {
//1) 初始化UserClientHandler
ClientRpcHandler clientRpcHandler = new ClientRpcHandler();
clientRpcHandler.setServerNode(serverNode);
//2) 初始化channel
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取ChannelPipeline
ChannelPipeline pipeline = socketChannel.pipeline();
//每5秒进行一次写检测,如果5秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每5秒向服务端发送一次消息;
pipeline.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
//设置编码
pipeline.addLast(new RpcEncoder(RpcRequest.class, new JsonSerializer()));
pipeline.addLast(new RpcDecoder(RpcResponse.class, new JsonSerializer()));
//添加自定义事件处理器
pipeline.addLast(clientRpcHandler);
}
});
//3) 解析连接地址
String[] strings = serverNode.split(":");
InetSocketAddress socketAddress = new InetSocketAddress(strings[0], Integer.parseInt(strings[1]));
//4) 建立连接通道
ChannelFuture channelFuture = bootstrap.connect(socketAddress).sync();
final EventLoop eventLoop = channelFuture.channel().eventLoop();
//5) 重连判断
if (!channelFuture.isSuccess()) {
//服务器未启动 连接tcp服务器不成功
System.out.println(serverNode + "与服务端断开连接!在5s之后准备尝试重连!");
//5秒后重连
eventLoop.schedule(() -> doConnect(serviceName, serverNode, bootstrap), 5, TimeUnit.SECONDS);
} else {
//6) 注册到消费者列表
ZkClientManager.getInstance().registerConsumerByServiceName(serviceName);
//7) 缓存连接信息
ChannelSession channelSession = new ChannelSession();
channelSession.setServerNode(serverNode);
channelSession.setChannelFuture(channelFuture);
channelSession.setClientRpcHandler(clientRpcHandler);
channelSessionMap.put(serverNode, channelSession);
System.out.println("与提供者:" + serverNode + " 建立连接成功!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
2、每次调用提供者时负载均衡的实现,目前按照相应时间最短来做,时间一致就随机
/**
* 负载均衡
* 响应时间最短的,响应时间一样就随机
*
* @return
*/
public String providerBalanceFastProvider(String serviceName) {
String providerServerNode = "";
List<String> serverList = providersMap.get(serviceName);
if (!CollectionUtils.isEmpty(serverList)) {
Map<Long, List<String>> responseTimeMap = new HashMap<>();
long minLastResponseTime = Integer.MAX_VALUE;
for (String serverNode : serverList) {
ChannelSession channelSession = RpcServiceConsumer.getInstance().getChannelSessionByServerNode(serverNode);
if (channelSession == null) {
continue;
}
long lastResponseTime = channelSession.getLastResponseTime();
System.out.println("负载均衡 -- 提供者:" + serverNode + "最后一次请求耗时为:" + channelSession.getLastResponseTime());
List<String> serverNodeList = responseTimeMap.getOrDefault(channelSession.getLastResponseTime(), new ArrayList<>());
serverNodeList.add(serverNode);
responseTimeMap.put(channelSession.getLastResponseTime(), serverNodeList);
if (lastResponseTime < minLastResponseTime) {
minLastResponseTime = lastResponseTime;
}
}
List<String> minLastResponseTimeServerNodeList = responseTimeMap.get(minLastResponseTime);
if (CollectionUtils.isEmpty(minLastResponseTimeServerNodeList)) { //如果为空,那就在serverList随机
providerServerNode = providerBalanceRandomProvider(serverList);
System.out.println("负载均衡 -- 提供者:" + serverList + " 进行随机负载");
} else if (minLastResponseTimeServerNodeList.size() > 1) { //如果有多个服务器响应时间一样,那就在minLastResponseTimeServerNodeList随机
providerServerNode = providerBalanceRandomProvider(minLastResponseTimeServerNodeList);
System.out.println("负载均衡 -- 提供者:" + minLastResponseTimeServerNodeList + " 最后一次请求耗时一样,进行随机负载");
} else { //如果只有一个服器响应时间最短,那就直接用这个
providerServerNode = minLastResponseTimeServerNodeList.get(0);
System.out.println("负载均衡 -- 提供者:" + providerServerNode + " 最后一次请求耗时最短");
}
}
return providerServerNode;
}
/**
* 负载均衡
* 随机负载
*
* @param serverList
* @return
*/
public String providerBalanceRandomProvider(List<String> serverList) {
String providerServerNode = "";
if (!CollectionUtils.isEmpty(serverList)) {
Random random = new Random();
providerServerNode = serverList.get(random.nextInt(serverList.size()));
}
return providerServerNode;
}
/**
* 负载均衡
* 随机负载
*
* @param serviceName
* @return
*/
public String providerBalanceRandomProvider(String serviceName) {
String providerServerNode = "";
List<String> serverList = providersMap.get(serviceName);
if (!CollectionUtils.isEmpty(serverList)) {
Random random = new Random();
providerServerNode = serverList.get(random.nextInt(serverList.size()));
}
return providerServerNode;
}
动态配置就是通过zookeeper的节点数据监听来进行实现,当配置有更改时拉取最新配置并重载 主要实现代码:(就是节点的数据监听)
/**
* 配置中心 path
*/
private String configsPath = "/configs";
/**
* 将配置注册到zookeeper
*
* @param data
*/
public Object setConfigData(Object data) {
try {
//1、创建zk客户端
zkClient = getZkClient();
//2、创建配置节点
//先创建主节点为持久节点
String configPath = configsPath + "/" + data.getClass().getSimpleName();
Stat consumersPathStat = zkClient.checkExists().forPath(configPath);
if (consumersPathStat == null) {
String s = zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configPath, JsonSerializer.serialize(data));
System.out.println("节点递归创建成功,该节点路径" + s);
} else {
zkClient.setData().forPath(configPath, JsonSerializer.serialize(data));
}
return dataWatcher(data, configPath);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 监听当前配置
*
* @param data
* @param configPath
* @throws Exception
*/
private Object dataWatcher(Object data, String configPath) throws Exception {
byte[] noteData = zkClient.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println("-- 监听器watchedEvent:" + watchedEvent);
String watchedPath = watchedEvent.getPath(); //监听到的path
Event.EventType watchedEventType = watchedEvent.getType(); //监听类型
Event.KeeperState watchedKeeperState = watchedEvent.getState(); //状态
System.out.println("watchedEventType:" + watchedEventType + "-- watchedPath:" + watchedPath);
//判断连接是否成功
if (watchedKeeperState == Event.KeeperState.SyncConnected) {
//事件监听
if (null != watchedEventType) {
switch (watchedEventType) {
case NodeDataChanged:
dataWatcher(data, configPath);
break;
}
}
} else {
zkClient = null;
System.out.println("客户端与服务端会话建立失败··· watchedKeeperState:" + watchedKeeperState);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).forPath(configPath);
Object newData = JsonSerializer.deserialize(data.getClass(), noteData);
System.out.println("-- 服务提供者列表变化: -- 变更内容:" + newData.toString());
return newData;
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。