# SpringCLoud Alibaba **Repository Path**: kaunghe/spring-cloud-alibaba ## Basic Information - **Project Name**: SpringCLoud Alibaba - **Description**: 包括 alibaba 的 nacos ,sentinel,seata,rocketmq 以及 SpringCloud 的 feign,gateway, skywalking 等组件 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 4 - **Created**: 2022-07-12 - **Last Updated**: 2022-07-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 1: pom 依赖说明 ![img.png](image/pom版本依赖.png) # 2:组件依赖说明 ![img.png](image/组件依赖.png) # 3:注册中心:nacos 启动 1:安装 nacos 下载地址: https://github.com/alibaba/nacos/releases 下载zip格式的安装包,然后进行解压缩操作 2:启动nacos ``` #切换目录cd nacos/bin #命令启动 startup.cmd -m standalone ``` 3:服务使用 添加依赖 ``` com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery 2021.1 ``` 启动类添加注解 ``` @EnableDiscoveryClient ``` 配置文件添加配置 ``` spring: cloud: nacos: discovery: server-addr: 127.0.0.1:8848 ``` 查看:打开浏览器输入http://localhost:8848/nacos,即可访问服务, 默认密码是nacos/nacos ## nacos 集群 ps:本例不使用 1:上传 nacos 包到 linux系统中,并解压 2:修改 nacos 配置文件 application.properties ```shell #修改端口号 server.port=3333 #(如果linux装的是Mysql5.+,在application.properties最后粘贴) spring.datasource.platform=mysql db.num=1 db.url.0=jdbc:mysql://127.0.0.1:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&serverTimezon e=UTC db.user=root db.password=123456 #(如果linux装的是Mysql8.+,在application.properties最后写:) spring.datasource.platform=mysql jdbc.DriverClassName=com.mysql.cj.jdbc.Driver # 指定数据库实例数量 db.num=1 # 第一个数据库实例地址 db.url.0=jdbc:mysql://127.0.0.1:3306/nacos_config?serverTimezone=GMT%2B8&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true db.user= root db.password= 123456 ``` 3:修改conf下的cluster.conf文件 ```shell ip1:端口1 ip2:端口2 ip3:端口3 ``` 4:修改 bin 目录下的 startup.sh,将 JVM 参数调小 5:启动 mysql ```shell mysql -uroot -p # 创建数据库 create database nacos_config; #切换目录并导入脚本 cd nacos/conf mysql -uroot -h 127.0.0.1 -p'123456' -P 3306 nacos_config < nacos-mysql.sql ``` 6:复制 Nacos,分别修改 application.properties 下端口,在 bin 目录下 ./startup.sh -p 端口启动 7:进入 nginx 的 conf 文件夹,修改 nginx.conf 文件 ![img.png](image/nginx%20配置.png) 8:来到 nginx 的 sbin 目录,启动 nginx , ./nginx ,到浏览器访问 nginx 地址即可。 # 4:远程调用和负载均衡:Feign && ribbon Feign 集成了 ribbon, Nacos 很好的兼容了 Feign, 所以 Nacos 使用 Feign 默认实现了负载均衡的策略 添加依赖 ``` org.springframework.cloud spring-cloud-starter-openfeign 3.1.1 org.springframework.cloud spring-cloud-starter-loadbalancer 3.1.1 ``` 启动类添加注解 ``` @EnableFeignClients ``` 创建接口 ``` @FeignClient("shop-product") public interface ProductService { @GetMapping(value = "/product/{pid}") Product findByPid(@PathVariable("pid") Integer pid); } ``` 实现调用 ``` productService.findByPid(pid); ``` # 5:限流降级:Sentinel 使用 作用是:限流,熔断 首先是依赖 ``` com.alibaba.cloud spring-cloud-starter-alibaba-sentinel 2021.1 ``` 然后是 配置文件配置 ``` # sentinel sentinel: transport: dashboard: localhost:8080 # springboot 2.6 以上的循环依赖问题 main: allow-circular-references: true ``` 下载 jar 包 ``` # 下载地址 https://github.com/alibaba/Sentinel/releases # 下载后启动 java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.3.jar ``` 浏览器访问localhost:8080 进入控制台 ( 默认用户名密码是 sentinel/sentinel ),到页面上设置限流和降级规则 # 6:Feign 结合 Sentinel 首先,需要引入二者的依赖 然后,配置配置文件 ``` # 开启 Feign对Sentinel的支持 feign: sentinel: enabled: true ``` 然后创建容错类,实现 Feign 接口 ```java /** * 容错类要求必须实现被容错的接口,并为每个方法实现容错方案 */ @Component @Slf4j public class ProductServiceFallBack implements ProductService { @Override public Product findByPid(Integer pid) { Product product = new Product(); product.setPid(-1); return product; } } ``` Feign 接口修改 ```java @FeignClient(value = "shop-product", fallback = ProductServiceFallBack.class) public interface ProductService { @RequestMapping("/product/{pid}")//指定请求的URI部分 Product findByPid(@PathVariable Integer pid); } ``` # 7:服务网关:gateway 结合 nacos 引入依赖, ``` com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery 2021.1 org.springframework.cloud spring-cloud-starter-loadbalancer 3.1.1 org.springframework.boot spring-boot-starter-webflux org.springframework.cloud spring-cloud-starter-gateway ``` 在主类上添加注解 ``` @EnableDiscoveryClient ``` 修改配置文件 ``` server: port: 7000 spring: application: name: api-gateway cloud: # nacos nacos: discovery: server-addr: 127.0.0.1:8848 # gateway gateway: discovery: locator: # 让gateway可以发现nacos中的微服务 enabled: true ``` 通过浏览器访问: localhost:7000/服务名称/请求路径 # 8:链路跟踪:skywalking 使用 下载地址 ``` http://skywalking.apache.org/downloads/ ``` 解压后进入到 bin 目录 ``` # linux 启动 ./startup.sh # windows 启动 双击 startup.bat ``` 如果是 gateway 集成,要将optional-plugins下的插件apm-spring-cloud-gateway-2.x-plugin-6.5.0.jar拷贝到plugins下 另外,项目启动时可以加入参数,需要跟踪的服务上都要加 ``` -javaagent:D:/Mysofts/SpringCloudAlibaba/skywalking/apache-skywalking-apm-8.9.1/apache-skywalking-apm-bin/agent/skywalking-agent.jar -Dskywalking.agent.service_name=provider -Dskywalking.collectorbackend_service=127.0.0.1:11800 ``` # 9:配置中心: nacos && config && apollo congig: 没有可视化界面,配置的生效不是实时的,需要重启或者刷新 apollo: 携程开发,可以实时生效,支持灰度发布,对配置可以进行版本控制,资料也很全 nacos: 首先是引入依赖 ``` com.alibaba.cloud spring-cloud-starter-alibaba-nacos-config 2021.1 org.springframework.cloud spring-cloud-starter-bootstrap 3.1.1 ``` 然后在 将 application.yml 修改成 bootstrap.yml ,并添加以下内容 ``` # nacos nacos: discovery: server-addr: 127.0.0.1:8848 config: server-addr: 127.0.0.1:8848 file-extension: yaml # 环境标识 profiles: active: dev ``` 然后在需要动态读取配置文件的类上加上注解 ```java @RestController //不加此注解,改完之后需要重启,通过cglib 代理类回调钩子函数 @RefreshScope public class TestController { @Value("${test.appName}") private String appName; @GetMapping("/user/test") public String test(){ return appName; } } ``` 最后可以在 nacos 继续配置 。 Data ID : 服务名称-环境标识.yaml # 10:消息驱动: rocketmq 加上依赖,发送端和接收端都要配置,mq 也要启动,这个就不说了 ``` org.apache.rocketmq rocketmq-spring-boot-starter 2.2.1 org.apache.rocketmq rocketmq-client 4.9.2 ``` 发送端配置文件进行配置 ``` rocketmq: name-server: 192.168.10.150:9876 producer: group: shop-order ``` 发送端发送消息代码 ``` @Autowired private RocketMQTemplate rocketMQTemplate; // 发送消息 rocketMQTemplate.convertAndSend("order-topic", order); ``` 消费端配置文件 ``` rocketmq: name-server: 192.168.10.150:9876 ``` 消费端监听消息代码 ``` @Slf4j @Service @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class MessageListener implements RocketMQListener { @Override public void onMessage(Order order) { log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order)); } } ``` # 11:分布式事务:Seata 1:从官网下载 server 和 源码包,并进行解压。下载地址 ``` https://github.com/seata/seata/releases. ``` 2:进入源码包,找到 seata-1.4.2\seata-1.4.2\script\server\db\mysql.sql, 新建一个数据库 seata, 执行下面的建表 sql ```sql -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(128), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; ``` 在 所有参与分布式事务的数据库中新建表 ```sql CREATE TABLE IF NOT EXISTS `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ``` 3:进入到 server 文件夹,先修改 file.conf 文件 ``` store { ## 1:这里修改成db mode = "db" ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" ## 2: 这里修改成自己数据库的登录名和密码 ## mysql 5.xx版本的driverClassName : ## driverClassName = "com.mysql.jdbc.Driver" ## mysql 8.0的driverClassName : driverClassName = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://127.0.0.1:3306/seata?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=true" user = "root" password = "admin" minConn = 5 maxConn = 100 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 } } ``` 4:启动 nacos,浏览器登录,在 命名空间 中新建命名空间 seata , 命名空间ID 不用填写,自动生成。复制生成的 命名空间ID 5:进入到 server 文件夹,修改 regiester.conf 文件 ``` #我使用的nacos作为配置中心和注册中心,使用将配置文件改为nacos registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" ##nacos修改的地方一 nacos { application = "seata-server" ##nacos服务的地址 serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" ##nacos中新建的一个命名空间,用于专门隔离seata的服务和配置 namespace = "73164d7c-6ea7-491c-b5a5-0da02d9d2d65" cluster = "default" username = "nacos" password = "nacos" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "nacos" ##nacos修改的地方二 nacos { ##nacos服务的地址: serverAddr = "127.0.0.1:8848" ## nacos中新建的一个命名空间,用于专门隔离seata的服务和配置 namespace = "73164d7c-6ea7-491c-b5a5-0da02d9d2d65" group = "SEATA_GROUP" username = "nacos" password = "nacos" ##seata使用1.4.2版本,新建的data id文件类型选择properties dataId = "seataServer.properties" } } ``` 6:来到 server 文件夹的 bin 目录下,点击 seata-server.bat 启动 seata-server 。启动成功后,可以到 nacos 上查看服务是否注册上去 7:打开源码包,本地修改seata-1.4.2\seata-1.4.2\script\config-center\config.txt. 注意上传的版本必须将所有注释#都直接删除掉,否则上传失败,还有必须将=后边给值,没有值的话给""; ``` 路径:script\config-center 修改了service.vgroupMapping和数据库地址和存储模式 ----------------------------------------------------------- transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableClientBatchSendRequest=false transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 # service # 1.自己命名一个vgroupMapping service.vgroupMapping.my_test_tx_group=default # seata的地址列表,此处是一个本地的 service.default.grouplist=127.0.0.1:8091 service.enableDegrade=false service.disableGlobalTransaction=false # client client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=false client.rm.tableMetaCheckerInterval=60000 client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 client.tm.defaultGlobalTransactionTimeout=60000 client.tm.degradeCheck=false client.tm.degradeCheckAllowTimes=10 client.tm.degradeCheckPeriod=2000 # store # 2.model改为db store.mode=db store.publicKey="" store.file.dir=file_store/data store.file.maxBranchSessionSize=16384 store.file.maxGlobalSessionSize=512 store.file.fileWriteBufferCacheSize=16384 store.file.flushDiskMode=async store.file.sessionReloadReadSize=100 store.db.datasource=druid store.db.dbType=mysql # 3.修改数据库的连接信息driverClassName和url,user,password store.db.driverClassName=com.mysql.cj.jdbc.Driver # 改为上面创建的seata服务数据库 store.db.url=jdbc:mysql://127.0.0.1:3306/seata?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=true # 改为自己的数据库用户名 store.db.user=root # 改为自己的数据库密码 store.db.password=root store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 store.redis.mode=single store.redis.single.host=127.0.0.1 store.redis.single.port=6379 store.redis.sentinel.masterName="" store.redis.sentinel.sentinelHosts="" store.redis.maxConn=10 store.redis.minConn=1 store.redis.maxTotal=100 store.redis.database=0 store.redis.password="" store.redis.queryLimit=100 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false # client.undo client.undo.dataValidation=true client.undo.logSerialization=jackson client.undo.onlyCareUpdateColumns=true server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 client.undo.logTable=undo_log client.undo.compress.enable=true client.undo.compress.type=zip client.undo.compress.threshold=64k log.exceptionRate=100 transport.serialization=seata transport.compressor=none metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898 ---------------------------------------------------------------- 注: service.vgroupMapping.my_test_tx_group=default 与项目模块中yml配置的seata.tx-service-group内容相同 ``` 8:来到源码的 seata-1.4.2\seata-1.4.2\script\config-center\nacos 文件夹下,右键 Git Bash Here 执行命令,将配置上传到 nacos 中 ``` #有命名空间的方式: sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -t 73164d7c-6ea7-491c-b5a5-0da02d9d2d65 -u nacos -w nacos #没有命名空间,默认是 public sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -u nacos -w nacos ``` 9:执行成功后,可以通过 nacos 查看配置是否上传完成。 10:在使用 seata 的 所有微服务中需要添加以下的依赖, ``` io.seata seata-spring-boot-starter 1.4.2 com.alibaba.cloud spring-cloud-starter-alibaba-seata 2021.1 io.seata seata-spring-boot-starter com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery 2021.1 org.springframework.cloud spring-cloud-starter-loadbalancer 3.1.1 com.alibaba.cloud spring-cloud-starter-alibaba-nacos-config 2021.1 ``` 11:在使用 seata 的 所有微服务中配置文件中进行配置 ``` spring: application: name: shop-product datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/shop?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true username: root password: admin type: com.alibaba.druid.pool.DruidDataSource cloud: # nacos nacos: discovery: server-addr: 127.0.0.1:8848 config: server-addr: 127.0.0.1:8848 alibaba: #事务群组,要和下方vgroup-mapping保持一致(可以每个应用独立取名,也可以使用相同的名字), #要与服务端nacos-config.txt中service.vgroup_mapping中存在,并且要保证多个群组情况下后缀名要保持一致-tx_group seata: tx-service-group: my_test_tx_group # springboot 2.6 以上的循环依赖问题 main: allow-circular-references: true #seata配置 seata: application-id: ${spring.application.name} enable-auto-data-source-proxy: true #是否开启数据源自动代理,默认为true #(1)事务群组(可以每个应用独立取名,也可以使用相同的名字), #要与服务端nacos-config.txt中service.vgroupMapping.my_test_tx_group=default,并且要保证多个群组情况下后缀名要保持一致-tx_group service: vgroup-mapping: my_test_tx_group: default # (2)seata配置中心 config: type: nacos nacos: namespace: 8b4e0943-2463-4156-acf3-a6381fb7ed38 #nacos服务的地址 serverAddr: 127.0.0.1:8848 #seata分组名称 group: SEATA_GROUP username: "nacos" password: "nacos" # (3)seata的注册中心 registry: #registry根据seata服务端的registry配置 type: nacos nacos: application: seata-server #配置自己的seata服务 server-addr: 127.0.0.1:8848 #nacos服务的地址 group: SEATA_GROUP #seata分组名称 namespace: 8b4e0943-2463-4156-acf3-a6381fb7ed38 username: "nacos" #nacos服务登录名称 password: "nacos" #nacos服务登录密码 ``` 12: 在所有使用微服务的启动类上加上注解 ``` @EnableAutoDataSourceProxy ``` 13: 在调用方的方法上,加上注解 ``` @GlobalTransactional(rollbackFor = Exception.class) ``` # 12:分布式事务的其他解法(知识普及) 现在分布式事务的实现基本上都是柔性事务,最终一致性 ## 2.1:数据库本地如何保证事务? ACID:原子性,一致性,隔离性,永久性 数据库中,先写日志文件,在写数据文件。 锁(保证CI) redo+undo(保证AD) ## 2.2:两阶段提交 2PC 有一个中间的事务管理器(Trancastion Manager),简称 TM ,两个参与的服务(Resource Manager),简称 RM,RM 注册到 TM 上 理论过程: 1:预提交阶段:TM 给两个 RM 发出信号,两个RM 在本地执行逻辑,但不提交,完成后给 TM 返回结果 2:提交或回滚阶段:TM 收到信号,如果两个都没问题,TM 发出真正提交的信息,两个 RM 执行提交,完成后返回结果; 若两个有一个没有预提交成功,TM 发出回滚信号,两个 RM 分别回滚,并返回结果。根据返回结果,是否成功, 执行是否回滚的操作。二者提交成功,TM 释放资源 缺点: 1:TM 的单点问题,分布式事务系统瘫痪 2:阻塞资源:TM 不发出提交信号或发送请求信号断网,会导致 RM 的资源占用,超时一直等待,效率慢 3:数据不一致问题:TM 发提交命令的时候,一个信号发出去了,一个信号发送中 TM 挂了,会导致数据不一致的问题 ## 2.3:三阶段提交 3PC 为了优化两阶段的资源占用大的弊端,产生了三阶段,引出了超时机制,二阶段 TM 有超时,三阶段让 RM 和 TM 都具有超时机制 ps:只是降低灾难发生的概率,却不能完全解决,降低资源锁定的时间。 理论过程: 1:询问能否提交can commit 不占用资源,排查网络问题 2:pre commit,预提交,微服务没有收到事务协调器的pre commit,超时回滚;收到失败信号,也回滚 3:do commit .这一阶段,如果超时没有收到事务协调者发出的指令,默认提交 缺点: 1:TM 的单点问题 2:数据不一致问题,人工补偿.一般是通过脚本扫描,在一定时间内补偿. ## 2.4:消息队列 + 事件表 + 定时任务 ps:不适用于数据量比较大的情况 代码已经实现,具体参考 shop-order 的 MessageTrancastionController 和 OrderTask,shop-product 的 MessageListener 和 ProduceTask 理论过程: 1:请求进入系统 A ,A 处理业务,更新 DB,并把这个事件写入事件表,状态是新建,记录业务类型,事件类型,业务id等。 这个操作可以通过本地事务完成 ps:当A 系统默认处理完成后,就可以给客户端返回结果了,剩下的事务进行异步处理。 2:A 系统投定时任务,去读取 A 服务的事件表,把状态为新建的数据封装成消息发送给 mq,mq 接收到返回 ack,A 系统更改事件表中数据状态为发送。 这个操作可以通过本地事务完成。如果消息发送失败,定时任务会再次发送 3:B 服务监听消息,将消息插入到 B 服务的事件表,将状态修改为已接收。这个操作可以通过本地事务完成 4:B 服务有定时任务,从 B 系统的事件表中读取状态为已接收的数据,处理业务,更新事件表状态为已完成。 这个操作可以通过本地事务完成 ps:事件表可以做成冷热数据,对已经完成的数据放在历史表当中,供历史查询,时间表只存储热数据 缺点: 吞吐量高,弊端是调用链不易太长,数据并不能实时,最终一致性 ## 2.5:lnc(基于 2PC 实现) ps:tx-lcn Github地址: https://github.com/codingapi/tx-lcn lock:锁定事务单元 confirm:确定事务 notify:通知事务 理论过程: 1:请求进入 A 服务,A 服务会向 TM 发起创建一个事务组(空的,不知道有多少个服务),生成一个 groupID 2:A 服务调用其他服务,直到最后一个服务调用完成,才会实际调用的服务添加到事务组中。一阶段完成,阻塞资源 3:返回 A 之后,A 服务关闭事务,并通知 TM 成功或者失败,TM 的事务通知模块异步通知服务进行提交或者回滚。二阶段完成。 关键点:如何保证第一阶段执行的 sql ,在第二阶段发来提交或者回滚的请求能执行第一阶段 ps1: 需要保留第一阶段执行的连接,db 连接池,第一阶段不提交,需要的等待第二阶段的信号。重写 close 方法,不让提交, 把请求id 和 连接存储在 map 中,协调的本质:代理 dataSoure,保持了请求和 db 的连接 ps2: 如何第二阶段等不到回应怎么办?网断了? 补偿机制:发一次没回应,第二次记下服务执行的sql,并做下标识 ps3:TM 下 redis 的作用 存储事务组和补偿机制 代码服务,可以查看以下实现案例,需要启动 redis: ```shell https://gitee.com/jiang-qikun/transaction-lcn.git ``` ## 2.6:TCC ps:TCC 模式,不依赖于底层数据资源的事务支持 try confirm cancle ps:try,confirm,cancle失败重试,重试都不好就人工介入,或者告警. 一般自带事务的中间件,不用 TCC ,用 lcn,因为没必要,增加了业务的复杂度。 理论过程:(try ? confirm:cancle) 1:try:先尝试所有的业务,数据落库 2:confirm:如果try没有问题,执行cinfirm,什么都不做或者执行一些方法 3:cancle:如果try有一个失败,cancle做一些delete操作,做一些与try相反的操作,逆sql,可以通过 map 或者 ThreadLocal 保存 缺点:业务比较复杂,自带事务的中间件,比如 mysql ,不用 TCC,使用 lcn,其他的不带事务的中间件,比如 redis ,mongo,使用 TCC 代码服务,可以查看以下实现案例: ```shell https://gitee.com/jiang-qikun/transaction-lcn.git ``` ## 2.7:Seata(基于2PC实现) ```shell ## 官方文档 http://seata.io/zh-cn/docs/overview/what-is-seata.html ``` TC,事务协调者,相当于 lcn 中的 TM TM ,事务管理者,也叫事务的发起者,是一种特殊的 RM RM ,资源管理者,每个参与分布式事务的微服务 支持 AT,TCC,SAGA,XA 模式 ### 2.7.1:AT 模式 最常用的模式。 一阶段:RM 执行业务逻辑,并且记录回滚日志,提交本地事务,释放资源。参与的 RM 将执行结果告诉 TC 二阶段:如果 TC 收到的有一个为 NO,发出回滚命令,服务根据回滚日志进行回滚。回滚日志出错有重试机制。提交的时候通知删除回滚日志即可。 ps:当一个问题解决不了的时候,可以采用鸵鸟算法,不解决。 理论过程: 1:事务开启的服务,获取本地锁(一段代码,sql) 2:执行 sql 3: 获取全局锁,block lock,所谓的全局锁是指同一事物涉及到的所有服务的锁 4:提交本地事务 5:释放本地锁,并调用下一个 RM。全部链路完成后,全局提交,释放全局锁 ps:如果存在另一个事务,会不断的尝试获取全局锁,直到这个事务释放全局锁 ### 2.7.2:TCC 模式 相应的,TCC 模式,不依赖于底层数据资源的事务支持. Seata 支持的 TCC 模式,会造成一些问题: 1:空回滚:try 未执行,cancle 执行。 try 执行的时候落库超时,没执行成功,执行 cancle 解决方案:加一个事务控制表,存储执行状态 2:幂等:多次执行 cancle 或者 confirm。由于采取了重试机制,前面的请求成功,但是没有返回 解决方案:在事务控制表,里面存储状态,1事务初始化,2已提交,3已回滚 3:悬挂:cancle 在 try 之前执行。try 执行的过程,超时了,开始执行cancle ,执行完成后,又开始 try 了。 解决方法:用上面的 cancle 同样可以解决。 ## 2.8:业务系统+事务消息 z支持事务消息的 mq :rocketmq ```shell ## 示例代码 https://gitee.com/jiang-qikun/rocketmq-transaction.git ``` 理论过程: 1:A 服务收到请求,给 rocketmq 发送事务消息,是一个 half 消息 2:mq 给 A 服务 ack,执行本地业务,给 mq 发送确认消息或者回滚 3:B 服务监听 mq ,消费消息 ## 2.9:可靠消息服务 TCC 和 2PC 的吞吐量还是比较低的。可以用可靠消息服务来完成:是对消息队列+本地事务表的改进. 增加一个服务,专门来做这个事件表 理论过程: 1:请求进入系统 A 给可靠消息服务 C 发送一个待确认消息(包含要做的业务) 2:C保存消息到db,待确认状态,并给A一个回复. 3:A 得到回复,执行本地业务,完成后,给C发送一个确认或取消的消息 4:C 收到消息,如果是确认,修改db消息状态已确认;如果是取消,也是修改状为已取消 5: C 服务定时任务扫描db表,将数据状态为已确认的发送给 mq,mq 会给 ack 回复 5:B系统监听消息,消费消息,执行本地事务,给 mq 一个回复。如果是执行成功,调用服务C,修改消息状态以完成。 # 13:分布式锁 适用于,多个服务实例调用,不是在同一个 jvm 中 ```shell ## 代码示例 https://gitee.com/jiang-qikun/distributed-lock.git ``` 对于单服务,在一个 jvm 内,可以通过加锁来完成。 对于集群服务,即一个服务多个实例的情况下,是无法通过一个 jvm 来完成的。 ## 3.1:mysql 锁 ps:阻塞式锁,适用于并发量不大的系统 新建一个数据库表,并且代码中有对应的实体类A,有 ThreadLocal, ```shell //1:加锁 从 ThreadLocal 中获取一个实例对象,将这个实例对象插入到数据表中,擦加入成功,表示加锁成功,主键冲突会导致加锁失败。 抢锁失败的可以 sleep 一下,继续调用 lock 方法抢锁 //2:解锁 从 ThreadLocal 中获取一个实例对象的id,通过这个主键将数据从数据库表中删除,并从 ThreadLocal 中删除。 ps:如果解锁失败,可以通过 mysql 触发器来完成,定时删除数据,避免死锁 ``` ## 3.2:手写redis 锁 向 redis 存入一个值,并设置过期时间,防止死锁发生 ```shell 1:存入一个值,设置过期时间,会有加不上的情况? 原子操作,在一个步骤中完成,不要分成两步写。 2:解锁的过程中,对于同一个key,A 请求超时,锁超时,B 请求重新添加,A 恢复,A 可能会释放 B 的锁 在 value 里设置自己的值,释放的时候比较 value 值。 或者,起一个守护线程,监测业务执行情况,执行到三分之一的时候,去延长过期时间,延长到开始设置的过期时间(不建议使用) ``` ## 3.3:使用 单个RedissonClient 或者 Redisson 对 redis 的封装。 ```xml org.redisson redisson 3.3.2 ``` ## 3.4:使用 redis 红锁 还是上面那个 maven 依赖,不过需要多个 redis,不是集群 核心思想: 1:获取当前时间 2:按照 redis 顺序逐个加锁,默认一切加完,这个是顺利的情况;异常的情况,去判断加锁的 redis 是否已经过半 3:判断加锁时间是否超过过期时间,或者 redis 未过半,失败。所有的 redis 都要释放锁 存在的问题: ```shell 假设存在 5 个独立的 redis,当一个请求 A 来利用红锁加锁的时候,三个 redis 加锁成功,A 去执行业务了。此时,很不巧,加锁成功的 一个 redis 挂了,也没有做持久化,运维恢复成功后数据丢失;那么此时请求 B 来加锁,三个 redis 加锁成功,那么就存在一个锁被两个同时拥有的情况 解决方法:运维人员重新启动掉的 redis 的时候,延迟启动,时间稍微大于或者等于锁的过期时间 ```