# springboot-jedis-seckill-demo **Repository Path**: tzs1993/springboot-jedis-seckill-demo ## Basic Information - **Project Name**: springboot-jedis-seckill-demo - **Description**: springboot整合redis(使用jedis客户端)完成分布式锁,场景是简单的秒杀系统,包含技术栈:springboot+mysql+mybatis+redis+rabbitmq - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 4 - **Created**: 2021-03-07 - **Last Updated**: 2022-06-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 入门篇 ## 前言 本节将以springboot为基础,结合Redis 和 RabbitMQ做一个秒杀系统的demo,主要展示Redis分布式锁以及消息队列的使用。 秒杀系统的主要基于以下的原则去实现 1. 系统初始化时,把商品存库数量加载到redis中 2. 当收到秒杀请求后,redis预减库存,库存不足则直接返回 3. 秒杀成功的请求入rabbitMQ,立即返回“正在抢购页面…”,当异步下单成功后才返回订单。 4. 客户端轮询是否秒杀成功,服务器请求出队,生成订单,减少库存。 ## pom文件 ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.3.3.RELEASE com.example Seckill 0.0.1-SNAPSHOT Seckill Demo project for Spring Boot 8 org.springframework.boot spring-boot-starter-web tk.mybatis mapper-spring-boot-starter 2.0.4 org.springframework.boot spring-boot-starter-jdbc 2.0.0.RELEASE org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.1 mysql mysql-connector-java 5.1.17 com.alibaba druid 1.1.1 org.projectlombok lombok 1.16.22 org.springframework.boot spring-boot-starter-amqp 2.1.8.RELEASE org.springframework.boot spring-boot-test 2.2.6.RELEASE junit junit 4.12 org.springframework spring-test 5.2.5.RELEASE org.springframework.boot spring-boot-starter-data-redis 2.2.0.RELEASE io.lettuce lettuce-core redis.clients jedis com.alibaba fastjson 1.2.57 org.apache.commons commons-lang3 3.5 commons-codec commons-codec 1.10 org.springframework.boot spring-boot-maven-plugin ``` ## application.yml 配置 ​ 主要配置了数据库,Redis,RabbitMQ的配置 ```yml server: port: 7999 spring: servlet: multipart: max-request-size: 100MB max-file-size: 20MB http: encoding: charset: utf-8 force: true enabled: true datasource: platform: mysql type: com.alibaba.druid.pool.DruidDataSource initialSize: 5 minIdle: 3 maxActive: 500 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 30000 validationQuery: select 1 testOnBorrow: true poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.43.101:3306/order_db?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf-8&useAffectedRows=true&rewriteBatchedStatements=true username: root password: accp rabbitmq: host: 192.168.43.101 port: 5672 username: guest password: guest redis: host: 192.168.43.101 port: 6379 timeout: 2000 pool: max-idle: 100 min-idle: 1 max-active: 1000 max-wait: -1 ``` ## 订单model以及对应的mybatis配置 本文只是做一个订单的记录,所以表的字段比较简单 ```sql CREATE TABLE `order_t` ( `order_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键 订单ID', `user_id` varchar(128) DEFAULT NULL COMMENT '用户Id', `product_id` varchar(128) DEFAULT NULL COMMENT '产品Id', `create_time` bigint(20) DEFAULT NULL COMMENT '时间', PRIMARY KEY (`order_id`) ) ENGINE=InnoDB AUTO_INCREMENT=36255 DEFAULT CHARSET=utf8mb4; ``` model ```java package com.seckill.model; import lombok.Data; import javax.persistence.*; import java.io.Serializable; @Data @Table(name = "order_t") public class Order implements Serializable { @Id @Column(name = "order_id") @GeneratedValue(strategy= GenerationType.IDENTITY) private Integer orderId; private String userId; private String productId; private Long createTime; } ``` 本项目中使用了mybatis的通用mapper tkmybatis,所以配置文件中都是空的 ```java package com.seckill.mapper; import com.seckill.model.Order; import tk.mybatis.mapper.common.Mapper; import tk.mybatis.mapper.common.MySqlMapper; public interface OrderMapper extends Mapper, MySqlMapper { } ``` resources/mapper/OrderMapper.xml ```xml ``` ## redis工具类 ```java package com.seckill.util; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericToStringSerializer; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Slf4j @Component public class RedisClient { @Resource private RedisTemplate redisTemplate; @PostConstruct public void init() { redisTemplate.setKeySerializer(new GenericToStringSerializer<>(String.class)); } /** * redis存值 * * @param key 键 * @param value 值 */ public void set(String key, Object value) { redisTemplate.opsForValue().set(key, value); } /** * hash存 * * @param key 键 * @param hash hash * @param value 值 */ public void set(String key, String hash, String value) { redisTemplate.opsForHash().put(key, hash, value); } /** * redis获取值 * * @param key 键 * @return 返回值 */ public Object get(String key) { return redisTemplate.opsForValue().get(key); } /** * hash取值 * * @param key 键 * @param hash hash * @return 返回redis存储的值 */ public String get(String key, String hash) { return (String) redisTemplate.opsForHash().get(key, hash); } /** * 获取redis的锁 * * @param key 键 * @param value 值为当前毫秒数+过期时间毫秒数 * @return 返回true/false */ public boolean lock(String key, String value) { if (redisTemplate.opsForValue().setIfAbsent(key, value)) { return true; } String oldExpireTime = (String) redisTemplate.opsForValue().get(key); if (!StringUtils.isEmpty(oldExpireTime ) && Long.parseLong(oldExpireTime ) < System.currentTimeMillis()) { String currentExpireTime = (String) redisTemplate.opsForValue().getAndSet(key, value); if (!StringUtils.isEmpty(currentExpireTime) && currentExpireTime.equals(oldExpireTime )) { return true; } } return false; } /** * redis释放锁 * * @param key 键 * @param value 值 */ public void unlock(String key, String value) { //执行删除可能出现异常需要捕获 try { String currentValue = (String) redisTemplate.opsForValue().get(key); if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) { //如果不为空,就删除锁 redisTemplate.opsForValue().getOperations().delete(key); } } catch (Exception e) { log.error("[redis分布式锁] 解锁", e); } } } ``` ## RabbitMQ配置 常量 ```java package com.seckill.config; public class RabbitConstants { /** * 分列模式 */ public final static String FANOUT_MODE_QUEUE = "fanout.mode"; /** * 日志打印队列 */ public final static String QUEUE_LOG_PRINT = "queue.log.recode"; /** * 主题模式 */ public final static String TOPIC_MODE_QUEUE = "topic.mode"; /** * 主题模式 */ public final static String TOPIC_ROUTING_KEY = "topic.*"; } ``` 配置 ```java package com.seckill.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class RabbitMqConfig { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData[{}],ack[{}],cause[{}]", correlationData, ack, cause)); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange[{}],route[{}],replyCode[{}],replyText[{}],message:{}", exchange, routingKey, replyCode, replyText, message)); return rabbitTemplate; } /** * 日志打印队列 */ @Bean public Queue logPrintQueue() { return new Queue(RabbitConstants.QUEUE_LOG_PRINT); } /** * 分列模式队列 */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitConstants.FANOUT_MODE_QUEUE); } /** * 分列模式绑定队列 * * @param logPrintQueue 绑定队列 * @param fanoutExchange 分列模式交换器 */ @Bean public Binding fanoutBinding(Queue logPrintQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(logPrintQueue).to(fanoutExchange); } /** * 主题队列 */ @Bean public Queue topicQueue() { return new Queue(RabbitConstants.TOPIC_ROUTING_KEY); } /** * 主题模式队列 *
  • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email
  • *
  • 通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
  • *
  • 通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
  • */ @Bean public TopicExchange topicExchange() { return new TopicExchange(RabbitConstants.TOPIC_MODE_QUEUE); } /** * 主题模式绑定队列2 * * @param topicQueue 主题队列 * @param topicExchange 主题模式交换器 */ @Bean public Binding topicBinding(Queue topicQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue).to(topicExchange).with(RabbitConstants.TOPIC_ROUTING_KEY); } } ``` ## 抢单逻辑OrderService ```java package com.seckill.service; import com.seckill.config.RabbitConstants; import com.seckill.mapper.OrderMapper; import com.seckill.util.RedisClient; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Component public class OrderService { public static final String PRODUCT_ID_KEY = "PID001_"; private static final Integer PRODUCT_COUNT = 5000; private static final String HAS_BUY_USER_KEY = "HAS_BUY_USER_KEY_"; private static final String LOCK_KEY = "LOCK_KEY_"; private static final String FAIL_BUYED = "已经买过了"; private static final String BUYE_SUCCESS = "抢到了,订单生成中"; private static final String FAIL_SOLD_OUT = "没货了"; private static final String FAIL_BUSY = "排队中,请重试!"; @Resource private RedisClient redisClient; @Resource private OrderMapper orderMapper; @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void initOrder() { redisClient.set(PRODUCT_ID_KEY, PRODUCT_COUNT); System.out.println("商品已经初始化完成:数量:" + PRODUCT_COUNT); } public String insertOrder(String userId) { //判断用户是否已买 Object hasBuy = redisClient.get(HAS_BUY_USER_KEY, userId); if (hasBuy != null) { return FAIL_BUYED; } //10s自动过期 int redisExpireTime = 10 * 1000; long lockValue = System.currentTimeMillis() + redisExpireTime; //获取redis锁,只有获取成功才能继续操作 boolean getLock = redisClient.lock(LOCK_KEY, String.valueOf(lockValue)); System.out.println(userId + " getLock:" + getLock); if (getLock) { Integer productCount = (Integer) redisClient.get(PRODUCT_ID_KEY); System.out.println("productCount:" + productCount); //库存大于0才能继续下单 if (productCount > 0) { rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_MODE_QUEUE, "topic.queue", userId); //减库存 redisClient.set(PRODUCT_ID_KEY, (productCount - 1)); //记录用户已买 redisClient.set(HAS_BUY_USER_KEY, userId, "1"); //手动释放锁 redisClient.unlock(LOCK_KEY, String.valueOf(lockValue)); return BUYE_SUCCESS; } else { System.out.println("亲," + FAIL_SOLD_OUT); //手动释放锁 redisClient.unlock(LOCK_KEY, String.valueOf(lockValue)); return FAIL_SOLD_OUT; } } else { return FAIL_BUSY; } } } ``` ## 消息队列处理订单入库 ```java package com.seckill.service; import com.seckill.config.RabbitConstants; import com.seckill.mapper.OrderMapper; import com.seckill.model.Order; import com.seckill.util.RedisClient; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Slf4j @Component public class RabbitMqHandler { @Resource private RedisClient redisClient; @Resource private OrderMapper orderMapper; /** * 日志打印处理handler * * @param message 待处理的消息体 */ @RabbitListener(queues = RabbitConstants.QUEUE_LOG_PRINT) public void queueLogPrintHandler(String message) { log.info("接收到操作日志记录消息:[{}]", message); } /** * 主题模式处理handler * * @param message 待处理的消息体 */ @RabbitListener(queues = RabbitConstants.TOPIC_ROUTING_KEY) public void queueTopicHandler(String message) { log.info("主题模式处理器,接收消息:[{}]", message); String userId = message; //产生订单 System.out.println("userId:" + userId); Order order = new Order(); order.setProductId(OrderService.PRODUCT_ID_KEY); order.setUserId(userId); order.setCreateTime(System.currentTimeMillis()); orderMapper.insert(order); System.out.println("用户:" + userId + "下单成功"); } } ``` ## 启动类 ```java package com.seckill; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import tk.mybatis.spring.annotation.MapperScan; @SpringBootApplication @MapperScan("com.seckill.mapper") public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } } ``` ## 抢单Controller ```java package com.seckill.controller; import com.seckill.service.OrderService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController @RequestMapping("/order") public class OrderController { @Resource private OrderService orderService; @GetMapping("/addOrder") public String addOrder(String userId) { return orderService.insertOrder(userId); } } ``` 后续只需要在controller中添加方法用于查询用户对应订单,前台定时轮询即可 ## 测试 ​ 写一个多线程程序进行测试,可以看到最后的数据完全正确 ```java package com.seckill.test; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class OrderThreadTest implements Runnable { @Override public void run() { try { httpURLGETCase(); } catch (Exception e) { e.printStackTrace(); } } private void httpURLGETCase() { String userId = UUID.randomUUID().toString().replaceAll("-", ""); String methodUrl = "http://127.0.0.1:7999/order/addOrder?userId=" + userId; System.out.println("开始访问:" + methodUrl); HttpURLConnection connection = null; BufferedReader reader = null; String line = null; try { URL url = new URL(methodUrl); connection = (HttpURLConnection) url.openConnection(); // 根据URL生成HttpURLConnection connection.setRequestMethod("GET"); // 默认GET请求 connection.connect(); // 建立TCP连接 if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) { reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); // 发送http请求 StringBuilder result = new StringBuilder(); // 循环读取流 while ((line = reader.readLine()) != null) { result.append(line).append(System.getProperty("line.separator")); // "\n" } System.out.println("结果" + result.toString()); if (result.toString().contains("没货了")) { long endTine = System.currentTimeMillis(); long useTime = endTine - beginTime; //共耗时:102041毫秒 //共耗时:82159毫秒 System.out.println("共耗时:" + useTime + "毫秒"); System.exit(0); } } } catch (IOException e) { e.printStackTrace(); } finally { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } connection.disconnect(); } } static long beginTime; public static void main(String[] args) { beginTime = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(10000); OrderThreadTest mythread = new OrderThreadTest(); Thread thread = new Thread(mythread); for (int i = 0; i < 1000001; i++) { es.execute(thread); } } } ``` 1000001个线程抢5000个商品,测试运行太慢的话,可以把商品数调小 ## 关键代码说明 着重讲解RedisClient类中的lock方法 一、redis命令讲解 1、setnx()命令: setnx的含义就是SET if Not Exists,其主要有两个参数 setnx(key, value)。 该方法是原子的,如果key不存在,则设置当前key成功,返回1; 如果当前key已经存在,则设置当前key失败,返回0。 2、get()命令: get(key) 获取key的值,如果存在,则返回;如果不存在,则返回nil;  3、getset()命令:getset(key, newValue)。原子方法,对key设置newValue这个值,并且返回key原来的旧值。 假设key原来是不存在的,那么多次执行这个命令,会出现下边的效果:  1. getset(key, “value1”) 返回nil 此时key的值会被设置为value1  2. getset(key, “value2”) 返回value1 此时key的值会被设置为value2  3. 依次类推!  二.具体的使用步骤如下:  1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。  2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。  3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。  4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。  5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。 ## 遗留问题 Redis锁的过期时间小于业务的执行时间该如何续期? 思路一:任务执行的时候,开辟一个守护线程,在守护线程中每隔一段时间重新设置过期时间。 思路二:通过Redisson中的看门狗来实现。