# cloud-alibaba-plus
**Repository Path**: lhyf-org/cloud-alibaba-plus
## Basic Information
- **Project Name**: cloud-alibaba-plus
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 2
- **Forks**: 0
- **Created**: 2021-07-25
- **Last Updated**: 2023-07-15
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 基于Spring Cloud Alibaba的练习项目
> 使用技术栈
- OpenFeign
- Nacos
- Sentinel
- log4j2
- knife4j
> TODO
- 全局异常处理
- Sentinel流控规则持久化
- dashboard 需要将上报日志持久化
- 需要将配置推动到nacos,客户端监听nacos配置变更,刷新本地规则
- 相同限流规则,不同服务的限流配置在nacos中使用同一个配置文件
- 网关限流开发
- 日志配置
- 重新配置日志文件
- 单一配置文件打印不同服务名
- 彩色日志打印
- 并使用异步日志输出
- 启动打印 Nacos 配置
- 缓存与分布式锁
- RedisTemplate,RedisService 共享与多租户前缀配置
- RedisTemplate 复杂对象序列化与反序列化
- 启用Redisson连接Redis
- 使用Redisson 作为分布式锁
- 使用Redis作为分布式锁在集群模式下会有什么问题?
- Redis使用AP模型,可能在主节点加锁成功,尚未同步到从节点时宕机,此时从节点被选举为Leader,新的线程就能在从节点获取到锁
- 如果使用ZK是可以解决这个问题的(CP),只是性能可能没有Redis高
- RedLock
- 分布式发号器
- 使用美团开源的 leaf-snowflake
- 时间回拨问题, 如果在服务启动状态下回拨时间, 将直接导致发号失败
- 回拨时间后,再次启动,原本会出现报错,通过在启动时加上容器close() 方法,能缓解,但是如果容器的close()方法延迟调用,将会导致服务上传回拨后的时间到zookeeper,
从而响应zookeeper中存储的时间
- 数据库相关
- 字段自动填充
- 逻辑删除
- Mybatis-plus 提供的方法在查询时会自动带上逻辑删除字段
- 手写SQL查询,不会自动带上逻辑删除字段,需要自己指定
- 乐观锁
- 多租户与动态数据源
- 分页
- 字段加解密
- 字段脱敏
- 线程池与@Async注解线程池的处理
- 支持传递登录态
- 网关鉴权与登录态传递
- kafka相关
- 实现 traceID 在生产端与消费端传递
- 登录态在生产端与消费端传递
- 打印消息从进入Kafka到被消费所需时间耗时
- 接口文档
- knife4j
- 入参校验
- 完善代码生成器对于实体类的生成
## 服务端口分配
| 服务或节点名 | 端口 | 服务说明 |
| --------------- |------| ------------- |
| gateway-app | 8101 | 网关 |
| gateway-doc | 8102 | 接口文档 |
| server-auth | 8301 | 授权服务 |
| server-user | 8302 | 用户服务 |
| server-customer | 8303 | 客户服务 |
| server-order | 8304 | 订单服务区 |
| server-product | 8305 | 商品服务 |
| server-kafka | 8306 | kafka消费服务 |
| server-im | 8307 | 通信服务 |
| server-job | 8308 | 定时任务 |
## 日志
### log4j2 开启全局异步日志
*可能会使启动时的异常日志无法打印出来* !!!
- 导入 disruptor 包
```xml
com.lmax
disruptor
3.4.2
```
- 在resources目录下新增`log4j2.component.properties`文件,用于配置全局异步
```properties
Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
```
- 查看效果
在线程中将看到一个名为 Log4j2-TF-1-AsyncLogger 的线程
## feign
若需要开启feign的fallback 与fallbackFactory 功能,需要开启服务的配置
```yaml
feign:
sentinel:
enabled: true
```
## sentinel 使用nacos持久化
[参考](https://www.sonake.com/2019/12/16/Sentinel-Nacos%E5%AE%9E%E7%8E%B0%E8%A7%84%E5%88%99%E6%8C%81%E4%B9%85%E5%8C%96/)
- 添加不同策略的Nacos拉取与推送类 `com.alibaba.csp.sentinel.dashboard.rule`
- 修改 `com.alibaba.csp.sentinel.dashboard.rule.nacos.NacosConfig`
- 修改dashboard依赖
- 修改微服务,加入依赖
```xml
com.alibaba.csp
sentinel-datasource-nacos
```
- 修改微服务配置,加入配置
```yaml
cloud:
sentinel: # 对应配置类 com.alibaba.cloud.sentinel.SentinelProperties
transport:
port: 8070 # 跟控制台交流的端口, 随意指定一个未使用的端口即可(控制台回调)
dashboard: 127.0.0.1:8080 # 指定控制台服务的地址
datasource:
flow:
nacos:
server-addr: ${spring.cloud.nacos.server-addr}
dataId: ${spring.application.name}-flow-rules
group-id: DEFAULT_GROUP
namespace: ${spring.cloud.nacos.config.namespace}
username: ${spring.cloud.nacos.username}
password: ${spring.cloud.nacos.password}
rule-type: flow
data-type: json
```
## Redisson
[参考](https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95#26-%E5%8D%95redis%E8%8A%82%E7%82%B9%E6%A8%A1%E5%BC%8F)
### 分布式锁使用
```java
public class MessageServiceImpl implements MessageService {
@Autowired
private RedissonClient redissonClient;
@Override
public String getMsg() {
RLock lock = redissonClient.getLock("getmsg");
try {
int waitTime = 5;
//锁有效时间(秒)
int leaseTime = 20;
// 直接指定持有锁时间,无法启用看门狗功能,如果方法执行超过持有锁时间,还是会出现并发问题
boolean locked = lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
if (locked) {
log.info("线程获取锁成功:{}", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(10);
} else {
log.warn("线程获取锁失败:{}", Thread.currentThread().getName());
}
} catch (InterruptedException e) {
log.error("{}", e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return "message";
}
}
```
## leaf-snowflake
高并发优化,如何榨干整台服务器
1.资源锁细粒度化 (如能使用行锁不要使用表锁)
2.业务使用锁细粒度化(能只锁部分代码,就不要锁整个方法)
3.无锁 (ThreadLocal 真无锁, CAS 假无锁)
4.异步+线程池 (CompletableFuture)
## Sleuth 链路追踪
1.自动配置类 TraceWebServletAutoConfiguration 注册一个Filter(TracingFilter),用于拦截请求
2.如果请求头中有相关的请求头,则使用传入的TraceID,否则创建一个新的
3.将Trace相关信息保存在ThreadLocal中(ThreadLocalCurrentTraceContext)
4.每次调用 Tracer 对象获取上下文时,就是从 ThreadLocal中获取
5.Trace 在请求头中传递的请求头名称查看 B3Propagation [b3-propagation](https://github.com/openzipkin/b3-propagation)
## kafka
1.生产者与消费者之间,登录态的传递
- 生产者使用 ProducerInterceptor 添加请求头, 使用 RecordInterceptor 取出请求头信息
- 生产者的添加 ProducerInterceptor 除了配置文件的方式外,可以通过定制化配置文件的方式添加
- 单个 RecordInterceptor 可以直接添加到容器中,如果要添加多个, 则需要将多个 RecordInterceptor ,则先添加到 CompositeRecordInterceptor,再将 CompositeRecordInterceptor 添加到容器中
- ConsumerInterceptor 一次传入多条消息,对于上下文这种不太合适,上下文类型的需求,需要单条消息处理
2.生产者与消费者之间,traceID的传递
TracingProducer 与 TracingConsumer 分别处理了 traceId 在kafka中的传递
3. 常用接口
- RecordInterceptor
- RecordMessageConverter
- KafkaHeaderMapper
4.配置
```yaml
spring:
kafka:
bootstrap-servers: 192.168.144.3:9092
listener:
missing-topics-fatal: false
consumer:
auto:
commit:
interval:
ms: 1000
auto-offset-reset: latest
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
group:
id: defaultConsumerGroup
request:
timeout:
ms: 180000
session:
timeout:
ms: 120000
producer:
acks: 1 # 只要集群的Leader节点收到消息,生产者就会收到一个来自服务器的成功响应
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger:
ms: 0
retries: 0 # 发送失败时,重试次数
```
## 基于Kafka的全服务事件广播机制
```xml
org.lhyf.cloud
event-spring-boot-starter
${project.version}
```
## 添加自定义线程池
> 三个目标
- 在项目中能通过线程池提交任务
- @Async注解也能使用相同的自定义线程池
- 在使用线程池时,不能遗失业务上下文,以及TraceId等信息
> 先查看配置类
如果不自定义线程池,将使用自动配置的线程池 org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
```java
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolAutoConfiguration {
/**
* 必须使用@Primary 修饰
* @param taskDecorator
* @return
*/
@Primary
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor(TaskDecorator taskDecorator) {
//cpu 核心数
int cpuCoreCount = Runtime.getRuntime().availableProcessors();
log.info("CPU核心数: " + cpuCoreCount);
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(8);
threadPoolTaskExecutor.setThreadNamePrefix("task-pool" + "-");
threadPoolTaskExecutor.setQueueCapacity(cpuCoreCount * 20);
threadPoolTaskExecutor.setMaxPoolSize(cpuCoreCount * 100);
threadPoolTaskExecutor.setKeepAliveSeconds(600);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 添加一个装饰器,用来处理上下文
threadPoolTaskExecutor.setTaskDecorator(taskDecorator);
log.info("线程池注入成功:{}", threadPoolTaskExecutor);
return threadPoolTaskExecutor;
}
/**
* 处理多线程下,上下文环境
*
* @return
*/
@Bean
public TaskDecorator taskDecorator() {
return new ContextDecorator();
}
}
```
为什么这样就可以了??
业务使用线程池比较简单,直接将 ThreadPoolTaskExecutor 自动注入业务类,即可使用.
> @Async使用线程池
@Async标注的方法所在的类将被创建动态代理,在该方法被调用时,将被执行动态代理的逻辑, 主要逻辑在 org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke 方法中, 其中有一个获取 AsyncTaskExecutor 的方法,依次看下去,可以找看到
在向容器中获取 TaskExecutor 类型的实例时,优先获取到我们自己定义的,从而实现了 @Async使用自定义的线程池
```java
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
// 尝试从容器中找 TaskExecutor 的实例,如果找到多个,将进入到异常处理环节,
// 但是由于我们自己定义的添加 @Primary ,将会被这里获取到
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
// 如果上面因为存在多个而抛出了异常,将获取默认的
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
```
> 在线程间传递上下文
业务上下文的传递
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#initializeExecutor
```java
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
// 使用配置的装饰器传递业务上下文
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
```
```java
@Slf4j
public class ContextDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
try {
// 从父线程获取上下文信息
String context = ContextThreadLocal.getContext();
log.debug("当前线程:" + Thread.currentThread().getName());
log.debug("当前线程context:" + context);
return () -> {
try {
// 设置在子线程中
ContextThreadLocal.setContext(context);
log.debug("当前线程:" + Thread.currentThread().getName());
log.debug("当前线程context:" + context);
runnable.run();
} finally {
// 移除上下文
ContextThreadLocal.remove();
}
};
} catch (IllegalStateException e) {
return runnable;
}
}
}
```
链路上下文的传递
org.springframework.cloud.sleuth.instrument.async.ExecutorBeanPostProcessor#postProcessAfterInitialization