# cloud-alibaba-plus **Repository Path**: lhyf-org/cloud-alibaba-plus ## Basic Information - **Project Name**: cloud-alibaba-plus - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2021-07-25 - **Last Updated**: 2023-07-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 基于Spring Cloud Alibaba的练习项目 > 使用技术栈 - OpenFeign - Nacos - Sentinel - log4j2 - knife4j > TODO - 全局异常处理 - Sentinel流控规则持久化 - dashboard 需要将上报日志持久化 - 需要将配置推动到nacos,客户端监听nacos配置变更,刷新本地规则 - 相同限流规则,不同服务的限流配置在nacos中使用同一个配置文件 - 网关限流开发 - 日志配置 - 重新配置日志文件 - 单一配置文件打印不同服务名 - 彩色日志打印 - 并使用异步日志输出 - 启动打印 Nacos 配置 - 缓存与分布式锁 - RedisTemplate,RedisService 共享与多租户前缀配置 - RedisTemplate 复杂对象序列化与反序列化 - 启用Redisson连接Redis - 使用Redisson 作为分布式锁 - 使用Redis作为分布式锁在集群模式下会有什么问题? - Redis使用AP模型,可能在主节点加锁成功,尚未同步到从节点时宕机,此时从节点被选举为Leader,新的线程就能在从节点获取到锁 - 如果使用ZK是可以解决这个问题的(CP),只是性能可能没有Redis高 - RedLock - 分布式发号器 - 使用美团开源的 leaf-snowflake - 时间回拨问题, 如果在服务启动状态下回拨时间, 将直接导致发号失败 - 回拨时间后,再次启动,原本会出现报错,通过在启动时加上容器close() 方法,能缓解,但是如果容器的close()方法延迟调用,将会导致服务上传回拨后的时间到zookeeper, 从而响应zookeeper中存储的时间 - 数据库相关 - 字段自动填充 - 逻辑删除 - Mybatis-plus 提供的方法在查询时会自动带上逻辑删除字段 - 手写SQL查询,不会自动带上逻辑删除字段,需要自己指定 - 乐观锁 - 多租户与动态数据源 - 分页 - 字段加解密 - 字段脱敏 - 线程池与@Async注解线程池的处理 - 支持传递登录态 - 网关鉴权与登录态传递 - kafka相关 - 实现 traceID 在生产端与消费端传递 - 登录态在生产端与消费端传递 - 打印消息从进入Kafka到被消费所需时间耗时 - 接口文档 - knife4j - 入参校验 - 完善代码生成器对于实体类的生成 ## 服务端口分配 | 服务或节点名 | 端口 | 服务说明 | | --------------- |------| ------------- | | gateway-app | 8101 | 网关 | | gateway-doc | 8102 | 接口文档 | | server-auth | 8301 | 授权服务 | | server-user | 8302 | 用户服务 | | server-customer | 8303 | 客户服务 | | server-order | 8304 | 订单服务区 | | server-product | 8305 | 商品服务 | | server-kafka | 8306 | kafka消费服务 | | server-im | 8307 | 通信服务 | | server-job | 8308 | 定时任务 | ## 日志 ### log4j2 开启全局异步日志 *可能会使启动时的异常日志无法打印出来* !!! - 导入 disruptor 包 ```xml com.lmax disruptor 3.4.2 ``` - 在resources目录下新增`log4j2.component.properties`文件,用于配置全局异步 ```properties Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector ``` - 查看效果 在线程中将看到一个名为 Log4j2-TF-1-AsyncLogger 的线程 ## feign 若需要开启feign的fallback 与fallbackFactory 功能,需要开启服务的配置 ```yaml feign: sentinel: enabled: true ``` ## sentinel 使用nacos持久化 [参考](https://www.sonake.com/2019/12/16/Sentinel-Nacos%E5%AE%9E%E7%8E%B0%E8%A7%84%E5%88%99%E6%8C%81%E4%B9%85%E5%8C%96/) - 添加不同策略的Nacos拉取与推送类 `com.alibaba.csp.sentinel.dashboard.rule` - 修改 `com.alibaba.csp.sentinel.dashboard.rule.nacos.NacosConfig` - 修改dashboard依赖 - 修改微服务,加入依赖 ```xml com.alibaba.csp sentinel-datasource-nacos ``` - 修改微服务配置,加入配置 ```yaml cloud: sentinel: # 对应配置类 com.alibaba.cloud.sentinel.SentinelProperties transport: port: 8070 # 跟控制台交流的端口, 随意指定一个未使用的端口即可(控制台回调) dashboard: 127.0.0.1:8080 # 指定控制台服务的地址 datasource: flow: nacos: server-addr: ${spring.cloud.nacos.server-addr} dataId: ${spring.application.name}-flow-rules group-id: DEFAULT_GROUP namespace: ${spring.cloud.nacos.config.namespace} username: ${spring.cloud.nacos.username} password: ${spring.cloud.nacos.password} rule-type: flow data-type: json ``` ## Redisson [参考](https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95#26-%E5%8D%95redis%E8%8A%82%E7%82%B9%E6%A8%A1%E5%BC%8F) ### 分布式锁使用 ```java public class MessageServiceImpl implements MessageService { @Autowired private RedissonClient redissonClient; @Override public String getMsg() { RLock lock = redissonClient.getLock("getmsg"); try { int waitTime = 5; //锁有效时间(秒) int leaseTime = 20; // 直接指定持有锁时间,无法启用看门狗功能,如果方法执行超过持有锁时间,还是会出现并发问题 boolean locked = lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); if (locked) { log.info("线程获取锁成功:{}", Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(10); } else { log.warn("线程获取锁失败:{}", Thread.currentThread().getName()); } } catch (InterruptedException e) { log.error("{}", e); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } return "message"; } } ``` ## leaf-snowflake 高并发优化,如何榨干整台服务器 1.资源锁细粒度化 (如能使用行锁不要使用表锁) 2.业务使用锁细粒度化(能只锁部分代码,就不要锁整个方法) 3.无锁 (ThreadLocal 真无锁, CAS 假无锁) 4.异步+线程池 (CompletableFuture) ## Sleuth 链路追踪 1.自动配置类 TraceWebServletAutoConfiguration 注册一个Filter(TracingFilter),用于拦截请求 2.如果请求头中有相关的请求头,则使用传入的TraceID,否则创建一个新的 3.将Trace相关信息保存在ThreadLocal中(ThreadLocalCurrentTraceContext) 4.每次调用 Tracer 对象获取上下文时,就是从 ThreadLocal中获取 5.Trace 在请求头中传递的请求头名称查看 B3Propagation [b3-propagation](https://github.com/openzipkin/b3-propagation) ## kafka 1.生产者与消费者之间,登录态的传递 - 生产者使用 ProducerInterceptor 添加请求头, 使用 RecordInterceptor 取出请求头信息 - 生产者的添加 ProducerInterceptor 除了配置文件的方式外,可以通过定制化配置文件的方式添加 - 单个 RecordInterceptor 可以直接添加到容器中,如果要添加多个, 则需要将多个 RecordInterceptor ,则先添加到 CompositeRecordInterceptor,再将 CompositeRecordInterceptor 添加到容器中 - ConsumerInterceptor 一次传入多条消息,对于上下文这种不太合适,上下文类型的需求,需要单条消息处理 2.生产者与消费者之间,traceID的传递 TracingProducer 与 TracingConsumer 分别处理了 traceId 在kafka中的传递 3. 常用接口 - RecordInterceptor - RecordMessageConverter - KafkaHeaderMapper 4.配置 ```yaml spring: kafka: bootstrap-servers: 192.168.144.3:9092 listener: missing-topics-fatal: false consumer: auto: commit: interval: ms: 1000 auto-offset-reset: latest enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: group: id: defaultConsumerGroup request: timeout: ms: 180000 session: timeout: ms: 120000 producer: acks: 1 # 只要集群的Leader节点收到消息,生产者就会收到一个来自服务器的成功响应 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger: ms: 0 retries: 0 # 发送失败时,重试次数 ``` ## 基于Kafka的全服务事件广播机制 ```xml org.lhyf.cloud event-spring-boot-starter ${project.version} ``` ## 添加自定义线程池 > 三个目标 - 在项目中能通过线程池提交任务 - @Async注解也能使用相同的自定义线程池 - 在使用线程池时,不能遗失业务上下文,以及TraceId等信息 > 先查看配置类 如果不自定义线程池,将使用自动配置的线程池 org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration ```java @Slf4j @EnableAsync @Configuration public class ThreadPoolAutoConfiguration { /** * 必须使用@Primary 修饰 * @param taskDecorator * @return */ @Primary @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor(TaskDecorator taskDecorator) { //cpu 核心数 int cpuCoreCount = Runtime.getRuntime().availableProcessors(); log.info("CPU核心数: " + cpuCoreCount); ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(8); threadPoolTaskExecutor.setThreadNamePrefix("task-pool" + "-"); threadPoolTaskExecutor.setQueueCapacity(cpuCoreCount * 20); threadPoolTaskExecutor.setMaxPoolSize(cpuCoreCount * 100); threadPoolTaskExecutor.setKeepAliveSeconds(600); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 添加一个装饰器,用来处理上下文 threadPoolTaskExecutor.setTaskDecorator(taskDecorator); log.info("线程池注入成功:{}", threadPoolTaskExecutor); return threadPoolTaskExecutor; } /** * 处理多线程下,上下文环境 * * @return */ @Bean public TaskDecorator taskDecorator() { return new ContextDecorator(); } } ``` 为什么这样就可以了?? 业务使用线程池比较简单,直接将 ThreadPoolTaskExecutor 自动注入业务类,即可使用. > @Async使用线程池 @Async标注的方法所在的类将被创建动态代理,在该方法被调用时,将被执行动态代理的逻辑, 主要逻辑在 org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke 方法中, 其中有一个获取 AsyncTaskExecutor 的方法,依次看下去,可以找看到 在向容器中获取 TaskExecutor 类型的实例时,优先获取到我们自己定义的,从而实现了 @Async使用自定义的线程池 ```java protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. // 尝试从容器中找 TaskExecutor 的实例,如果找到多个,将进入到异常处理环节, // 但是由于我们自己定义的添加 @Primary ,将会被这里获取到 return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { // 如果上面因为存在多个而抛出了异常,将获取默认的 return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; } ``` > 在线程间传递上下文 业务上下文的传递 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#initializeExecutor ```java protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { // 使用配置的装饰器传递业务上下文 Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; } ``` ```java @Slf4j public class ContextDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { try { // 从父线程获取上下文信息 String context = ContextThreadLocal.getContext(); log.debug("当前线程:" + Thread.currentThread().getName()); log.debug("当前线程context:" + context); return () -> { try { // 设置在子线程中 ContextThreadLocal.setContext(context); log.debug("当前线程:" + Thread.currentThread().getName()); log.debug("当前线程context:" + context); runnable.run(); } finally { // 移除上下文 ContextThreadLocal.remove(); } }; } catch (IllegalStateException e) { return runnable; } } } ``` 链路上下文的传递 org.springframework.cloud.sleuth.instrument.async.ExecutorBeanPostProcessor#postProcessAfterInitialization