# MicroService **Repository Path**: fortune-teller/MicroService ## Basic Information - **Project Name**: MicroService - **Description**: No description available - **Primary Language**: Java - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-07-19 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README #### 网关模块 ##### Spring WebFlux与Spring WebMVC ​ WebFlux和WebMVC具备绝对的性能优势和语法优势。 ​ 但是WebFlux为了兼容WebMVC的编码风格,做了很多的适配,其中就包括HandlerInterceptor的适配 ​ 这个网关项目既发挥了WebFlux的性能优势,同时又最大程度的兼容了WebMVC的扩展机制。 ​ 下面就演示如何使用WebFlux的WebHandler和WebMVC的HandlerInterceptor构建请求链 ​ ![](https://images.gitee.com/uploads/images/2020/0729/232459_953fba67_7745127.png) ![](https://images.gitee.com/uploads/images/2020/0729/232459_2408d0dd_7745127.png) ##### 插件式路由 ![](https://images.gitee.com/uploads/images/2020/0729/232459_e0ccf4be_7745127.png) ```java public final class GatewayHandler implements WebHandler{ private List plugins;//所有插件 public GatewayHandler(final List plugins) { this.plugins = plugins; } @Override//请求起点 public Mono handle(@NonNull final ServerWebExchange exchange) { return new DefaultPluginChain(plugins) .execute(exchange); // .subscribeOn(scheduler) // .doOnSuccess(t -> {}); } private static class DefaultPluginChain implements PluginChain, WebMvcConfigurer { private int index; private final List plugins; DefaultPluginChain(final List plugins) { this.plugins = plugins; } @Override//执行路由 public Mono execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { Plugin plugin = plugins.get(this.index++); //根据请求动态判断是否跳过此插件 Boolean skip = plugin.skip(exchange); if (skip) { return this.execute(exchange); } else { return plugin.execute(exchange, this); } } else { return Mono.empty(); } }); } @Override//适配MVC语法的HandlerInterceptor public void addInterceptors(InterceptorRegistry registry) { plugins.forEach(plugin -> { if (Objects.nonNull(plugin.getInterceptor())){ registry.addInterceptor(plugin.getInterceptor()); } }); } } } public interface Plugin { default HandlerInterceptor getInterceptor(){ return null;//默认没有适配MVC语法 } default Mono execute(ServerWebExchange exchange, PluginChain chain){ return null; } int getOrder();//执行顺序 default String named() { return ""; } default Boolean skip(ServerWebExchange exchange) { return false;//默认不跳过 } } ``` ##### 权限控制 ```java @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Authorize { String[] value(); } public class AuthorizePlugin implements Plugin { @Override public HandlerInterceptor getInterceptor() { return new HandlerInterceptor(){ @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (!(handler instanceof HandlerMethod)) { return true; } HandlerMethod handlerMethod = (HandlerMethod) handler; Authorize authorize = handlerMethod.getMethod().getAnnotation(Authorize.class); if (authorize == null) { return true; // no need to authorize } String[] allowedHeaders = authorize.value(); //从header里获取请求权限,如果header里没有就校验失败 String authzHeader = request.getHeader(AuthConstant.AUTHORIZATION_HEADER); if (StringUtils.isEmpty(authzHeader)) { throw new RuntimeException(AuthConstant.ERROR_MSG_MISSING_AUTH_HEADER); } if (!Arrays.asList(allowedHeaders).contains(authzHeader)) { throw new RuntimeException(AuthConstant.ERROR_MSG_DO_NOT_HAVE_ACCESS); } return true; } }; } @Override public int getOrder() { return 1; } } ``` ##### 负载均衡 ```java @Slf4j public class DividePlugin implements Plugin { @Autowired private ServiceRegisterAndDiscover serviceRegisterAndDiscover; @Override public Mono execute(ServerWebExchange exchange, PluginChain chain) { //获取请求上下文 final WebContext context = exchange.getAttribute(Constants.CONTEXT); //根据注册发现服务定位到具体的服务地址 String address = serviceRegisterAndDiscover.discover(context, LoadBalanceEnum.RANDOM.getLoadBalance());//默认使用了随机负载 //把服务地址添加到Spring的http请求上下文 exchange.getAttributes().put(Constants.HTTP_URL, address); return chain.execute(exchange); } @Override public int getOrder() { return 2; } } public interface LoadBalance {//负载均衡策略接口 String load(WebContext webContext); } @AllArgsConstructor public enum LoadBalanceEnum {//策略模式装载不同的负载算法 RANDOM(new RandomLoadBalance()), HASH(new HashLoadBalance()); @Getter private LoadBalance loadBalance; } @Slf4j public class RandomLoadBalance implements LoadBalance{ @Override public String load(WebContext webContext) { String serviceName = webContext.getServiceName(); int addressCacheSize = ServicesHolder.addressSize(serviceName); String address = null; if (addressCacheSize > 0) { if (addressCacheSize == 1) {//如果只有一条链路就不用算了 address = ServicesHolder.getAddress(serviceName, 0); } else { address = ServicesHolder.getAddress(serviceName, ThreadLocalRandom.current().nextInt(addressCacheSize));//随机算法 } log.info(">>>get only address node:" + address); } return address; } } @Slf4j public class HashLoadBalance implements LoadBalance { @Override public String load(WebContext webContext) { String serviceName = webContext.getServiceName(); String balanceKey = webContext.getBalanceKey(); int addressCacheSize = ServicesHolder.addressSize(serviceName); String address = null; if (addressCacheSize > 0) { if (addressCacheSize == 1) {//如果只有一条链路就不用算了 address = ServicesHolder.getAddress(serviceName, 0); } else {//一致性哈希算法 address = ServicesHolder.getAddress(serviceName, balanceKey.hashCode()%addressCacheSize); } log.info(">>>get only address node:" + address); } return address; } } ``` ##### 流量控制 ```java @Target({ElementType.PARAMETER, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RateLimit { String description() default ""; String key() default "";//自定义分桶策略 LimitType limitType() default LimitType.IP;//默认用IP分桶 enum LimitType { CUSTOMER, IP } } @Aspect @Configuration public class LimitAspect { //根据IP分不同的令牌桶, 每天自动清理缓存 private static LoadingCache caches = CacheBuilder.newBuilder() .maximumSize(1000) .expireAfterWrite(1, TimeUnit.DAYS) .build(new CacheLoader() { @Override public RateLimiter load(String key) { // 新的IP初始化 每秒只发出20个令牌 return RateLimiter.create(20); } }); @Pointcut("@annotation(com.elegant.code.aop.limit.RateLimit)") public void ServiceAspect() {} @Around("ServiceAspect()") public Object around(ProceedingJoinPoint joinPoint) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); RateLimit limitAnnotation = method.getAnnotation(RateLimit.class); RateLimit.LimitType limitType = limitAnnotation.limitType(); String key = limitAnnotation.key(); Object obj; try { if(limitType.equals(RateLimit.LimitType.IP)){ key = IPUtils.getIpAddr(); } RateLimiter rateLimiter = caches.get(key); Boolean flag = rateLimiter.tryAcquire(); if(flag){ obj = joinPoint.proceed(); }else{ throw new RuntimeException("访问的太频繁了"); } } catch (Throwable e) { throw new RuntimeException("访问的太频繁了"); } return obj; } } ``` #### rpc ##### 服务注册中心 ![](https://images.gitee.com/uploads/images/2020/0729/232459_9dd6646e_7745127.png) ```java public interface ServiceRegisterAndDiscover { //根据注册服务名称注册服务真实地址 void registryAddressPath(String serviceName, String address); //根据服务名称和负载策略发现服务,负载策略由负载均衡插件来实现 String discover(String serviceName, LoadBalance loadBalance); } @UtilityClass public class ServicesHolder { private String registryPath = Constants.ZK_REGISTRY; @Getter private List serviceNames = new CopyOnWriteArrayList<>(); { addServiceName("order"); addServiceName("product"); addServiceName("pay"); } private ConcurrentSkipListMap servicePaths = new ConcurrentSkipListMap<>(); private ConcurrentSkipListMap> addresses = new ConcurrentSkipListMap<>(); } @Slf4j @Service public class ServiceRegisterAndDiscoverImpl implements ServiceRegisterAndDiscover, //Webflux的web容器启动事件,不是tomcat ApplicationListener { @Autowired private ZkClient zkClient; private String registryPath = Constants.ZK_REGISTRY; @Override public void onApplicationEvent(ReactiveWebServerInitializedEvent event) { //根路径的注册和服务路径的注册是容器初始化的时候就完成的 //因为有哪些服务是已经确定的,所以这两个方法是私有的 registryRootPath(); registryServicePath(); //此刻还没有任何一个真实的address注册进来 //调用zookeeper就不展开讲了,可以和图中对照 } private void registryRootPath() { if (!zkClient.exists(registryPath)) { zkClient.createPersistent(registryPath); log.info(">>> create registry node:" + registryPath); } } private void registryServicePath(){ ServicesHolder.getServiceNames() .stream() .map(ServicesHolder::addServicePath) .filter(servicePath -> !zkClient.exists(servicePath)) .peek(servicePath -> { //创建service节点(持久) zkClient.createPersistent(servicePath); log.info(">>>create service node:" + servicePath); }); } @Override//根据服务名注册服务具体的地址,这个方法是对外暴露的 public void registryAddress(String serviceName, String address){ String servicePath = ServicesHolder.addAddress(serviceName, address); String addressNode = zkClient.createPersistentSequential(servicePath, address); log.info(">>> create address node:" + addressNode); } @Override public String discover(WebContext webContext, LoadBalance loadBalance) { String serviceName = webContext.getServiceName(); String servicePath = ServicesHolder.getServicePath(serviceName); //获取服务节点 if (!zkClient.exists(servicePath)) { throw new RuntimeException(String.format(">>>can't find any service node on path {}",servicePath)); } //从本地缓存获取某个服务地址 //不同的负载策略获取地址的算法不一样,后续 String address = loadBalance.load(webContext); if (Objects.isNull(address)){ List addressList = zkClient.getChildren(servicePath); ServicesHolder.addAddress(servicePath, addressList); //监听servicePath下的子文件是否发生变化,如果发生变化就要刷新注册中心缓存 zkClient.subscribeChildChanges(servicePath,(parentPath,currentChilds)->{ log.info(">>>servicePath is changed:" + parentPath); ServicesHolder.addAddress(servicePath, currentChilds); }); if (CollectionUtils.isEmpty(addressList)) { throw new RuntimeException(String.format(">>>can't find any address node on path {}", servicePath)); } address = loadBalance.load(webContext); log.info(">>>get address node:" + address); } return address; } } ```