# redis学习 **Repository Path**: ayuanyuan1/redis-learning ## Basic Information - **Project Name**: redis学习 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-06-19 - **Last Updated**: 2024-06-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # redis学习 ## 一、键 > 1、keys*: 查看当前库所有key > > 2、exists key: 判断某个key是否存在 > > 3、type key: 查看你的key是什么类型 > > 4、del key: 删除指定的key > > 5、unlink key: 根据value选择==非阻塞==(异步)删除 > > 6、expire key 10: 为给定的key设置过期时间,单位秒 > > 7、ttl key: 查看还有多少秒过期,-1表示永不过期,-2表示已过期 > > 9、select :命令切换数据库 > > 10、dbsize: 查看当前数据库的key的数量 > > 11、flushdb:清空当前库 > > 12、flushall: 通杀全部库 ## 二、常用数据类型 ### (1)字符串String > ·字符串最大长度为512M,单键单值 > > 1、set : 添加键值对,如果存在则修改 > > 2、get : 取值 > > 3、append : 追加值到末尾 > > 4、strlen : 获得值的长度 > > 5、setnx : 只有key不存在时,设置key的值 > > 6、incr : 将key中存储的数字值增加1,只能对数字操作,如果为空,新增值为1 > > 7、 decr : 将key中存储的数字值减1 > > 8、 incrby/decrby <步长>: 将key中存储的数字值加/减 步长 > > 9、mset:设置一个或多个key-value对 > > 10、mget:同时获取一个或多个value > > 11、msetnx:设置一个或多个key-value对,有一个失败则都==失败== > > 12、getrange <起始位置><结束位置>:获得值的范围,包前包后 > > 13、setrange <起始位置>:覆盖之前的从起始位置开始的值 > > 14、setex <过期时间>:设置键值的同时,设置过期时间 > > 15、getset:设置新值的同时获得旧值 ### (2)列表List > 单键多值,底层是双向列表 > > 1、lpush/rpush :从左边/右边插入一个或多个值 > > 2、lpop/rpop:吐出一个值。==值在键在,值亡键亡== > > 3、rpoplpush:从key1右边吐出一个值,插到key2左边 > > 4、lrange:按照索引下标获得元素,stop为 -1表示末尾 > > 5、lindex:按照索引下标获得元素 > > 6、llen:或者列表的长度 > > 7、linsertbefore/after:在value的后面/前面插入新value > > 8、lrem:从左边删除n个value > > 9、lset:将列表key下标为index的值替换成value ### (3)集合Set > 添加、删除、查找的复杂度都是O(1) > > 1、sadd :添加一个或多个元素,如果重复则忽略 > > 2、smembers:取出该集合的所有值 > > 3、sismember:判断集合是否存在value,有1,没有0 > > 4、scard:返回集合的元素个数 > > 5、srem:删除集合中某些元素 > > 6、spop:随机从改集合中吐出一个值 > > 7、srandmember:随机从集合中取出n个值,不会从集合中删除 > > 8、smove:把集合中一个值从一个集合移动到另外一个集合 > > 9、sinter:返回两个集合的交集元素 > > 10、sunion:返回两个集合的并集元素 > > 11、sdiff:返回两个集合的差集元素(key1中的,不包含key2中的) ### (4)哈希hash > key:{id=1,name=zhangsan,age=20} > > 1、hset :给集合中的field键赋value值 > > 2、hget :从集合field中取值 > > 3、hmset>:批量设置hash值 > > 4、hexists:判断field是否存在 > > 5、hkeys:列出集合的所有field > > 6、hvals:列出集合的所有value > > 7、hincrby:给field的值加上增量increment > > 8、hsetnx:给field的值设置为value,==field不存在== ### (5)有序集合Zset > 与set非常相似,是一个没有重复元素的字符串集合 > > 每个元素都都关联了一个评分,评分可以重复,通过评分可以排序(升序) > > 1、zadd:将一个或多个member元素及其score加入有序集key中 > > 2、zrange[withscores]:返回之间的元素,带withscores可以让分数和值一起返回到结果集 > > 3、zrangebyscore ==min max== [withscores] [limit off]:返回集合key中,所以score在min和max之间(包括边界)的元素,从小到大排序 > > 4、z==rev==rangebyscore ==max min== [withscores] [limit offset count]:从大到小 > > 5、zincrby :为元素的score加上增量 > > 6、zrem:删除该集合下指定value的值 > > 7、zcount :统计分数区间的元素个数 > > 8、zrank:返回改value值在集合中的排名,从0开始 ## 三、新数据类型 ### (1)Bitmaps > 对字符串进行位操作 > > 合理使用位操作可以提高内存使用率和开发速率 > > 1、setbit :设置bitmaps中某个偏移量的值(0或1) > > 2、getbit :获取偏移量offset的值,从0开始算 > > 3、bitcount:统计字符串被设置为1的数量 > > 4、bitcount [start,end]:统计字符串在start~end(-1最后一位,-2倒数第二位)范围内被设置为1的数量 > > 5、bitop and(or/not/xor) [key1,key2]:把key1和可以2的交集(并集,非,异或)的结果保存到destkey ### (2)HyperLogLog > 基数就是一个集合中除去相同值的基准数 > > 基数中统计相关的功能 > > 1、pfadd : 添加一个或多个元素,能够去重 > > 2、pfcount: 获得基数的数量 > > 3、pfmerge : 合并两个key1,key2到destkey中去 ### (3)Geospatial > 地理信息 > > 1、geoadd:添加一个或多个地理位置(经度,纬度,名称)==经度范围-180~180,纬度-85.051~85.051超出范围会报错== > > 2、geopos: 获得坐标值 > > 3、geodist[m/km/ft/mi]:获取两个位置的直线距离,默认单位米 > > 4、georadius m/km/ft/mi:以给定的维度为中心,找出某一半径内的元素 ## 四、发布和订阅 > 1、打开一个客户端订阅通道:channe1 ```linux subscribe channe1 ``` > 2、打开另外一个客户端在通道channe1发送消息hello ```linux publish channe1 hello #返回订阅者数量 ``` > 3、第一个客户端可以看到消息 ## 五、Jedis操作 > 添加jar > > 如果是配置在服务器上的redis连接不上,可以修改配置文件 > > 1、注释掉 bind 127.0.0.1 ,因为默认是本机访问 > > 2、保护模式protected-mo yes改成no > > 3、重启redis > > 4、关闭防火墙 systemctl stop(state) firewalld ```maven redis.clients jedis 2.9.0 ``` ### (1)测试连接 ```java public static void main(String[] args) { //1.创建Jedis对象 Jedis jedis = new Jedis("127.0.0.1", 6379); //2.测试是否连接成功 String ping = jedis.ping(); System.out.println(ping); //输出pong } ``` ### (2)key操作 ```java public static void main(String[] args) { //1.创建Jedis对象 Jedis jedis = new Jedis("127.0.0.1", 6379); //2.创建一个key-value String set = jedis.set("name", "suyuan");//返回ok //3.获取value String name = jedis.get("name");//suyuan //4.设置多个key-value String mset = jedis.mset("k1", "v1", "k2", "v2");//ok //5.输出所有key Set keys = jedis.keys("*"); System.out.println(keys); } ``` ### (3)数据类型操作 ```java public static void main(String[] args) { //1.创建Jedis对象 Jedis jedis = new Jedis("127.0.0.1", 6379); //2.左边添加 Long key1 = jedis.lpush("key1", "1", "11", "111"); //3 //3.左边获取 List key11 = jedis.lrange("key1", 0, -1); System.out.println(key11);//111,11,1 //a.添加set Long sadd = jedis.sadd("set1", "a", "b", "c","a");//3 Set set1 = jedis.smembers("set1"); System.out.println(set1);//[a, b, c] //b.哈希操作 Long hset = jedis.hset("users", "age", "20");//1 String hget = jedis.hget("users", "age"); System.out.println(hget);//20 //c.zset操作 jedis.zadd("china",100,"shanghai"); Set china = jedis.zrange("china", 0, -1); System.out.println(china); } ``` ### (4)模拟验证码发生 > 案例 > > 1.输入手机号,生成6为验证码,2分钟有效 > > 2.每个手机号一天只能获取3次验证码 ``` import redis.clients.jedis.Jedis; import java.util.Random; public class JedisDemo1 { public static void main(String[] args) { //1.获取验证码 VerifyPhoneGetCode("139789"); //245892 //2.判断验证码 // verifyCode("139789","245892"); } //3.验证验证码 public static void verifyCode(String phone,String code){ //1.建立jedis连接 Jedis jedis = new Jedis("", 6379); //2.获取redis的验证码 String codekey = "verify:"+phone+":code"; final String rediscode = jedis.get(codekey); if (code.equals(rediscode)){ System.out.println("验证成功"); }else { System.out.println("验证失败"); } } //2.通过手机号获取验证码 public static String VerifyPhoneGetCode(String phone){ //1.建立jedis连接 Jedis jedis = new Jedis("", 6379); //2.创建两个key,1个存放手机号当前获取几次验证码,1个存放当前手机号的验证码 String countkey = "verify:"+phone+":count"; String codekey = "verify:"+phone+":code"; //3.判断当前手机获取验证码次数 String count = jedis.get(countkey); if (count==null){//第一次 jedis.setex(countkey,60*60*24,"1");//设置第一次存放过期时间为一天 }else if (Long.parseLong(count)<3){ jedis.incr(countkey);//加1 }else { //超过三次 System.out.println(phone+":今日获取验证码次数已经超过3次"); jedis.close(); return "";//或者抛出异常 } //4.存放code,注意如果超过三次就不用存放code的,上面return或者抛出异常了 final String code = getCode(); System.out.println(phone+"存放redis的code:"+code); jedis.setex(codekey,60*2,code);//设置过期时间两分钟 return ""; } //1.生成6位验证码 public static String getCode(){ Random random = new Random(); String code=""; for (int i = 0; i < 6; i++) { int ran = random.nextInt(10); code +=ran; } return code; } } ``` ## 六、springboot整合redis ### (1)引入依赖 ```maven org.springframework.boot spring-boot-starter-data-redis org.apache.commons commons-pool2 ``` ### (2)添加配置文件 ```properties #Redis服务器地址 spring.redis.host=127.0.0.1 #Redis服务器连接端口 spring.redis.port=6379 #Redis数据库索引(默认位0) spring.redis.database=0 #连接超时时间(毫秒) spring.redis.timeout=1800000 #连接池最大连接数(使用负值表示没有限制) spring.redis.lettuce.pool.max-active=20 #最大阻塞等待时间(复数表示没有限制) spring.redis.lettuce.pool.max-wait=-1 #连接池中的最大空闲连接 spring.redis.lettuce.pool.max-idle=5 #连接池中的最小空闲连接 spring.redis.lettuce.pool.min-idle=0 ``` ### (3)添加配置类 ```java @EnableCaching @Configuration public class RedisConfig extends CachingConfigurerSupport { @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) { RedisTemplate template = new RedisTemplate<>(); RedisSerializer redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setConnectionFactory(factory); //key序列化方式 template.setKeySerializer(redisSerializer); //value序列化 template.setValueSerializer(jackson2JsonRedisSerializer); //value hashmap序列化 template.setHashValueSerializer(jackson2JsonRedisSerializer); return template; } @Bean public CacheManager cacheManager(RedisConnectionFactory factory) { RedisSerializer redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); //解决查询缓存转换异常的问题 ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置序列化(解决乱码的问题),过期时间600秒 RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofSeconds(600)) .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer)) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)) .disableCachingNullValues(); RedisCacheManager cacheManager = RedisCacheManager.builder(factory) .cacheDefaults(config) .build(); return cacheManager; } } ``` ### (4)测试 ```java //我写在了AdminsController //http://localhost:8080/ayuancity/admins/redis @Autowired private RedisTemplate redisTemplate; //测试redis @GetMapping("/redis") public String testRedis(){ //1.设置值到redis redisTemplate.opsForValue().set("name","suyuanyuan"); //2.获得redis值 String name=(String) redisTemplate.opsForValue().get("name"); return name; } ``` ## 七、事务操作 ### (1)基本操作 > ·Redis事务是一个单独的隔离操作∶事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断 > > ·Redis事务的主要作用就是串联多个命令防止别的命令插队 > > ·multi:组队阶段;exec:执行阶段;discard:放弃组队 ![在这里插入图片描述](./redis%E5%AD%A6%E4%B9%A0.assets/20210705212449445.png) ``` 组队发生失败,则执行失败 组队成功,但个别命令失败,则只会发生个别命令失败 ``` ### (2)事物冲突 ``` 只有一万块钱,三个请求同时过来,最终造成金额为-4千 ``` ![1718331690968](redis%E5%AD%A6%E4%B9%A0.assets/1718331690968.png) > 解决办法 > > a.悲观锁:每次操作前先加锁,别人不能进行操作 > > b.乐观锁:添加一个版本号,每个操作都会修改版本号,别人操作时候查看版本号是否一致才能操作 ![1718331940784](redis%E5%AD%A6%E4%B9%A0.assets/1718331940784.png) ### (3)乐观锁的使用 > 使用场景:抢票等多读的场景 > > 语法: > > 1.在执行multi之前先执行watch key1[key2],监视一个或者多个key > > 2.unwath取消监视 ![1718332677665](redis%E5%AD%A6%E4%B9%A0.assets/1718332677665.png) ### (4)事务三特性 > 1.单独隔离操作 > > ​ 所有事物都会序列化,按顺序执行。事务在执行过程中不会被其他客户端发送来的命令打断 > > 2、没有隔离级别的概念 > > ​ 队列命令没有提交之前都不会被执行 > > 3.不保证原子性 > > ​ 事务中如果有一条命令执行失败,其他的命令仍然会执行,没有回滚 ### (5)事务和锁的秒杀案例 ![在这里插入图片描述](redis%E5%AD%A6%E4%B9%A0.assets/169c92b8f8ab4320999af26ea3f79891.png) #### a.前端页面 ```html <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> Insert title here

iPhone 13 Pro !!! 1元秒杀!!!

``` #### b.后端代码 ```java /** * 秒杀案例 */ public class SecKillServlet extends HttpServlet { private static final long serialVersionUID = 1L; public SecKillServlet() { super(); } protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { String userid = new Random().nextInt(50000) +"" ; String prodid =request.getParameter("prodid"); //boolean isSuccess=SecKill_redis.doSecKill(userid,prodid); boolean isSuccess= SecKill_redisByScript.doSecKill(userid,prodid); response.getWriter().print(isSuccess); } } ``` ```java public class SecKill_redis { public static void main(String[] args) { Jedis jedis =new Jedis("",6379); System.out.println(jedis.ping()); jedis.close(); } //秒杀过程 public static boolean doSecKill(String uid,String prodid) throws IOException { //1 uid和prodid非空判断 if(uid == null || prodid == null) { return false; } //2 连接redis Jedis jedis = new Jedis("",6379); //3 拼接key // 3.1 库存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒杀成功用户key String userKey = "sk:"+prodid+":user"; //4 获取库存,如果库存null,秒杀还没有开始 String kc = jedis.get(kcKey); if(kc == null) { System.out.println("秒杀还没有开始,请等待"); jedis.close(); return false; } // 5 判断用户是否重复秒杀操作 if(jedis.sismember(userKey, uid)) { System.out.println("已经秒杀成功了,不能重复秒杀"); jedis.close(); return false; } //因为kc为字符串,所以先转换城integer类型的 //6 判断如果商品数量,库存数量小于1,秒杀结束 if(Integer.parseInt(kc)<=0) { System.out.println("秒杀已经结束了"); jedis.close(); return false; } //7.1 库存-1 jedis.decr(kcKey); //7.2 把秒杀成功用户添加清单里面 jedis.sadd(userKey,uid); System.out.println("秒杀成功了.."); jedis.close(); return true; } } ``` > ==因为用户id是随机生成模拟的一次一次点击,如果同时点击可能会遇到bug== #### c.利用工具ab实现并发工具 > centos6:默认已经安装 > > centos7:yum install httpd-tools > > ab --help测试是否安装 > ab -n 请求数量 -c 并发数量 -p 提交参数(文件名,文件中写参数=值) -T 类型 请求路径 > > ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://172.22.109.30:8081/Seckill/doseckill ==出现超卖和连接超时问题== ##### ①连接超时问题解决:使用连接池 ```java public class JedisPoolUtil { private static volatile JedisPool jedisPool = null; private JedisPoolUtil() { } public static JedisPool getJedisPoolInstance() { if (null == jedisPool) { synchronized (JedisPoolUtil.class) { if (null == jedisPool) { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(200); poolConfig.setMaxIdle(32); poolConfig.setMaxWaitMillis(100*1000); poolConfig.setBlockWhenExhausted(true); poolConfig.setTestOnBorrow(true); // ping PONG,判断是否还存在 jedisPool = new JedisPool(poolConfig, "172.22.109.205", 6379, 60000 ); } } } return jedisPool; } public static void release(JedisPool jedisPool, Jedis jedis) { if (null != jedis) { jedisPool.returnResource(jedis); } } } ``` ```java //2 连接redis //Jedis jedis = new Jedis("192.168.44.168",6379); //通过连接池得到jedis对象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource(); ``` ##### ②超卖问题:利用乐观锁 ```java //加入一个监视的watch jedis.watch(kcKey); //4 获取库存,如果库存null,秒杀还没有开始 ......... //7 秒杀过程 //使用事务 Transaction multi = jedis.multi(); //组队操作 multi.decr(kcKey);//库存-1 multi.sadd(userKey,uid);//把秒杀成功用户添加清单里面 //执行 List results = multi.exec(); if(results == null || results.size()==0) { System.out.println("秒杀失败了...."); jedis.close(); return false; } //7.1 库存-1 //jedis.decr(kcKey); //7.2 把秒杀成功用户添加清单里面 //jedis.sadd(userKey,uid); System.out.println("秒杀成功了.."); jedis.close(); return true; ``` ##### ③乐观锁造成库存遗留问题 > 乐观锁造成版本号不一致,造成部分用户可能购买不了 > > ==使用Lua脚本语言== > > 将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数,提升性能(==好像就是利用了redis单线程,类似于悲观锁==) ```java public class SecKill_redisByScript { private static final org.slf4j.Logger logger =LoggerFactory.getLogger(SecKill_redisByScript.class) ; public static void main(String[] args) { JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis=jedispool.getResource(); System.out.println(jedis.ping()); Set set=new HashSet(); // doSecKill("201","sk:0101"); } static String secKillScript ="local userid=KEYS[1];\r\n" + "local prodid=KEYS[2];\r\n" + "local qtkey='sk:'..prodid..\":qt\";\r\n" + "local usersKey='sk:'..prodid..\":usr\";\r\n" + "local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" + "if tonumber(userExists)==1 then \r\n" + " return 2;\r\n" + "end\r\n" + "local num= redis.call(\"get\" ,qtkey);\r\n" + "if tonumber(num)<=0 then \r\n" + " return 0;\r\n" + "else \r\n" + " redis.call(\"decr\",qtkey);\r\n" + " redis.call(\"sadd\",usersKey,userid);\r\n" + "end\r\n" + "return 1" ; static String secKillScript2 = "local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" + " return 1"; public static boolean doSecKill(String uid,String prodid) throws IOException { JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis=jedispool.getResource(); //String sha1= .secKillScript; String sha1= jedis.scriptLoad(secKillScript); Object result= jedis.evalsha(sha1, 2, uid,prodid); String reString=String.valueOf(result); if ("0".equals( reString ) ) { System.err.println("已抢空!!"); }else if("1".equals( reString ) ) { System.out.println("抢购成功!!!!"); }else if("2".equals( reString ) ) { System.err.println("该用户已抢过!!"); }else{ System.err.println("抢购异常!!"); } jedis.close(); return true; } } ``` ## 八、持久化操作RDB和AOP ### (1)RDB > 在指定的时间间隔内将内存中的数据集快照写入磁盘 > > 具体的备份流程如下: > > ​ Redis会单独创建((fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能。 > > 使用场景 > > ​ 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效 > > 优点 > > ​ 适合大规模的数据恢复 > > ​ 对数据完整性和一致性要求不高更适合使用节省磁盘空间 > > ​ 恢复速度快 > > 缺点 > > ​ 内存数据被克隆了一份,会有两倍的膨胀性 > > ​ RDB的最后一次持久化后的数据可能丢失。 > > ​ > > 数据如果有变化的,会在/usr/local/bin目录下生成一个dum.rdb的文件 ```config save 秒 写操作 save 300 10 如果300秒内进行写操作10次就进行一次rdb保存 ``` ### (2)AOF > 默认不开启 > > appendonly no,改为yes > > 类似使用日记==记录非读==的操作 > 因为AOF会在文件采用追加==非读的操作==的形式记录操作,所以文件会越来越大,因此有一个重写机制,文件大小超过原文件的两倍会进行内容压缩,只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof > 重写文件也会fork一个临时文件,再将临时文件进行持久化 > 优点: > > 备份机制更稳健,丢失数据概率更低 > > 可读的日志文本,通过操作AOF稳健,可以处理误操作 > > 缺点: > > 比起RDB占用更多的磁盘空间。恢复备份速度要慢。 > > 每次读写都同步的话,有一定的性能压力。 > > 存在个别Bug,造成不能恢复 ==如果AOF和RDB同时开启,会使用AOF文件== ## 九、主从复制 > 主机数据更新后会根据配置和策略,自动同步到备机的master/slaver机制 > > master主机:以写为主 > > slave从机:以读为准 ![1718603411386](redis%E5%AD%A6%E4%B9%A0.assets/1718603411386.png) > 优点: > > 读写分离,性能扩展 > > 容灾快速恢复:一个从机宕机,会找下一个从机 > 主从复制的原理: > > 1.从机(slave)启动成功连接到主机(master)后,从机会发送一个sync(数据同步消息)命令 > > 2.主机接到命令启动后台的存盘进程(主服务器进行持久化生成rdb文件),同时收集所有用于修改数据集的命令,在后台执行完毕后,主机将传送整个数据文件(rdb文件)到从机完成一次同步 > > a.全量复制:刚连接主机,主机完成一次全部同步命令 > > b.增量复制:主机收到修改命令,会给从机修改的数据 ### (1)搭建过程 ``` >方法(1):复制三份配置文件,修改端口号启动 ``` > 方法(2):配置文件是可以使用include,包含相同的内容的;然后再使用三份配置文件修改不同的==端口号,pidfile,rdb==即可。 > > ①新建三个配置文件,起名redis6379.conf,redis6380.conf,redis6381.conf > > ```conf > include /myredis/redis.conf > pidfile /var/run/redis_6379.pid > port 6379 > dbfilename dump6379.rdb > > include /myredis/redis.conf > pidfile /var/run/redis_6380.pid > port 6380 > dbfilename dump6380.rdb > > > include /myredis/redis.conf > pidfile /var/run/redis_6381.pid > port 6381 > dbfilename dump6381.rdb > > ``` > > ②分别启动三台服务器 > > ```cmd > redis -server 配置文件 > > ``` > > ③登录客户端 > > ```cmd > redis -cli -p 端口号 > > ``` > > ④查看主机运行情况 > > ```客户端 > info replication > > ``` > > ⑤选择从机的客户端,添加主机 > > ```客户端 > slaveof <主机ip> <主机端口号> > 如:slaveof 127.0.0.1 6379 > > ``` > > 再通过第④步查看信息即可 ### (2)薪火相传 > 一个主机管理多个从机可能会比较麻烦,因此可以使用从机管理多个从机 ``` slaveof <从机ip> <从机端口号> 如:slaveof 127.0.0.1 6379 ``` ### (3)反客为主 > 主服务器宕机了,从机当主服务器 ``` slaveof no one 将从机变为主机,利用shutdown挂掉主机 缺点:得手动实现 ``` ### (4)哨兵模式 > 监听主机是否宕机,可以自动实现==反客为主== > 搭建过程:==已经搭建好一主多从的前提下== > > ①新建一个sentinel.conf文件。==名字不能错== > > ```sentinel > //mymaster是服务器的名字,ip,端口,1表示至少有多个个哨兵同意迁移的数量,有1个哨兵同意从机切换为主机,就切换,如果是2,那么就需要两个哨兵同意 > sentinel monitor mymaster 127.0.0.1 6379 1 > > ``` > > ②启动哨兵,==默认端口26379== > > ``` > redis -sentinel /sentinel.conf > > ``` > > 选举的规则: > > 1.优先级高的优先当主机,配置文件的slave-priority 100,值越小优先级越高==新版本的名字可能是replica-priority 100== > > 2.偏移量越大则优先当主机,偏移量是表示从机的数据与主机的数据相同量 > > 3.runid最小的优先当主机,每次启动redis系统随机生成的runid(40位) > > ==宕机的主机重启,会变成从机== > 缺点: > > 1.复制延迟: > > 因为所有的写都在主机上操作,然后同步更新到从机上,所以当系统繁忙或者从机数量过多会使这个问题更加严重 > > ```java //JAVA代码,实现主从复制 private static JedisSentinelPool jedisSentinelPool=null; public static Jedis getJedisFromSentinel(){ if(jedisSentinelPool==null){ Set sentinelSet=new HashSet<>(); sentinelSet.add("127.0.0.1:26379"); JedisPoolConfig jedisPoolConfig =new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10); //最大可用连接数 jedisPoolConfig.setMaxIdle(5); //最大闲置连接数 jedisPoolConfig.setMinIdle(5); //最小闲置连接数 jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待 jedisPoolConfig.setMaxWaitMillis(2000); //等待时间 jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig); return jedisSentinelPool.getResource(); }else{ return jedisSentinelPool.getResource(); } } ``` ## 十、集群 > redis集群实现了redis的水平扩容:即启动N个redis节点,每个节点存放总数据的1/N > > 代理主机:通过一台主机访问各个部分的模块 > > 无中心化集群:各个模块可以互相访问 ![1718676023412](redis%E5%AD%A6%E4%B9%A0.assets/1718676023412.png) #### (1)搭建 > 1.将之前的rdb,aof文件删除 > > 2.配置基本conf文件(相比之前多了开启集群,节点名字,超时时间),如我们配置三个主机,每个主机都有一个从机,则需要修改6个不同的conf文件 > > ```conf > include /myredis/redis.conf > pidfile "/var/run/redis_6391.pid" > port 6391 > dbfilename "dump6391.rdb" > cluster-enabled yes > cluster-config-file nodes-6391.conf > cluster-node-timeout 15000 > > > ``` > > 虚拟机中可用%s/6391/6380 将6391替换成6380 > > 3.将所有redis服务启动 > > 4.切换到recis的src目录中 > > 注意==要真实的ip== > > ==replicas 1 表示采用最简单的方式配置集群,一台主机一台从机== > > ``` > redis-cli --cluster create --cluster-replicas 1 192.168.242.110:6379 192.168.242.110:6380 192.168.242.110:6381 192.168.242.110:6389 192.168.242.110:6390 192.168.242.110:6391 > > ``` > > 5.集群的启动 要添加 -c > > ``` > redis -cli -c -p 6379 > > ``` > > ==主机和从机的分配原则是:== > > ==1.每个主节点尽量不在同一个ip;== > > ==2.主节点和从节点尽量不在同一个ip== #### (2)slots插槽 > 一个redis包含16384个插槽,每个如果键都属于16384个插槽的其中一个 > > 集群会平分16384个插槽,如节点A负责0~5460号插槽,节点B负责5461~10922号插槽,节点C负责10923~16383号插槽 > > 集群中 set k1 v1,则计算crc16(k1)%16384得到一个插槽号,则存放到某个节点中去 > > ``` > 查询集群中值:cluster keyslot k1 > 查询卡槽中12706以内key的数量,只能查本节点的插槽以内的值:cluster countkeyyinslot 12706 > 查询本卡槽号5474以内的值,返回前2个:cluster getkeyinslot 5474 2 > > ``` > > 注意==mset k1 v1 k2 v2==会报错,需要使用{}把他们放到同一个组里进行计算插槽的值 > > ``` > mset k1{user} v1 k2{user} v2 > > ``` #### (3)故障 > 如果主机宕机,从机则变成主机,原来的主机上线之后会变成从机 > > 配置中:cluster-require -full -coverage > > ·yes:如果主机和从机都宕机了,则整个集群都挂掉 > > ·no: 如果主机和从机都宕机了,则该插槽不能使用(不能存储和读取) #### (4)Jedis开发 > java操作集群 > > 优点:扩容,分摊压力,无中心化配置相对简单 > > 缺点:多键操作不被支持,多键的redis事务和lua脚本不被支持 ```java public class JedisClusterTest { public static void main(String[] args) { HostAndPort hostAndPort = new HostAndPort("127.0.0.1", 6379); JedisCluster jedisCluster = new JedisCluster(hostAndPort); jedisCluster.set("k5","v5"); String k5 = jedisCluster.get("k5"); System.out.println(k5); } } ``` ## 十一、应用问题 #### (1)缓存穿透 > redis命中率降低,一直查询数据库。导致数据库崩溃 > 解决办法 > > 1.缓存空置,过期时间要短 > > 2.设置可访问的名单:使用bitmaps定义一个可以访问的名单 > > 3.采用布隆过滤器:底层就是bitmaps,但有时候命中率不高 > > 4.进行人工实时监控,查看是不是有黑客攻击,设置黑名单进行限制服务 #### (2)缓存击穿 > 数据库访问压力瞬时增加 > > redis里面没有出现大量key过期 > > redis正常运行 > > 其实就是:某个大量使用的key过期了 > 解决办法: > > 1.预先设置热门数据:在高峰访问之前,提前添加热门数据,并加大过期时长 > > 2.实时调整:实时调整key的过期时长 > > 3.使用锁:如果redis查询为空则设置一个锁,防止大量查询进入,等查询成功再删除锁==缺点:效率很低== #### (3)缓存雪崩 > 数据库压力变大,服务器崩溃 > > 在极少时间段,查询大量的key集中过期(注:雪崩是大量key过期,击穿是热门key过期) > 解决办法 > > 1.构建多级缓存架构:nginx缓存+redis缓存+其他缓存如ehcache > > 2.使用锁或者队列 > > 3.设置过期标志更新缓存:如果缓存过期,出发另外的线程在后台更新key的缓存 > > 4.将缓存失效时间分散开 #### (4)分布式锁 > 在分布式或者集群中,如果单独对一台机器创建了锁,这个锁不能对所有机器生效; > > 因此分布式锁让所有机器都生效 > 使用redis实现分布式锁 > > 1.setnx: 通过删掉key释放锁,设置key的过期时间,即可按时间释放锁 > > 2.边创建边设置过期时间:set name xiaohong nx ex 10 ==nx为上锁 ex为过期时间== ```java @GetMapping("testLock") public void testLock(){ //1获取锁,setnx ,顺便设置过期时间 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3,TimeUnit.SECONDS); //2获取锁成功、查询num的值 if(lock){ Object value = redisTemplate.opsForValue().get("num");//提前在redis中set num 0 //2.1判断num为空return if(StringUtils.isEmpty(value)){ return; } //2.2有值就转成成int int num = Integer.parseInt(value+""); //2.3把redis的num加1 redisTemplate.opsForValue().set("num", ++num); //2.4释放锁,del redisTemplate.delete("lock"); }else{ //3获取锁失败、每隔0.1秒再获取 try { Thread.sleep(100); testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` > 误删问题: > > 采用 setnx lock 111可能会导致锁被误删; > > 应该采用setnx lock uuid,释放锁的时候要判断当前的uuid和lock的uuid是否一致 ![1718713787030](redis%E5%AD%A6%E4%B9%A0.assets/1718713787030.png) ```java @GetMapping("testLock") public void testLock(){ String uuid = UUID.randomUUID().toString(); //1获取锁,setne ,顺便设置过期时间 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3,TimeUnit.SECONDS); //2获取锁成功、查询num的值 if(lock){ ... //2.3把redis的num加1 redisTemplate.opsForValue().set("num", ++num); //2.4释放锁,del String lockUuid = (String)redisTemplate.opsForValue().get("lock"); if(uuid.equals(lockUuid)){ redisTemplate.delete("lock"); } }else{ ... } } ``` > 误删问题: > > 删除的原子性问题 ![1718714246096](redis%E5%AD%A6%E4%B9%A0.assets/1718714246096.png) > 通过lua脚本,产生原子性 ```java @GetMapping("testLockLua") public void testLockLua() { //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中 String uuid = UUID.randomUUID().toString(); //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除! String skuId = "25"; // 访问skuId 为25号的商品 100008348542 String locKey = "lock:" + skuId; // 锁住的是每个商品的数据 // 3 获取锁 Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS); // 第一种: lock 与过期时间中间不写任何的代码。 // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间 // 如果true if (lock) { // 执行的业务逻辑开始 // 获取缓存中的num 数据 Object value = redisTemplate.opsForValue().get("num"); // 如果是空直接返回 if (StringUtils.isEmpty(value)) { return; } // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在! int num = Integer.parseInt(value + ""); // 使num 每次+1 放入缓存 redisTemplate.opsForValue().set("num", String.valueOf(++num)); /*使用lua脚本来锁*/ // 定义lua 脚本 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 使用redis执行lua执行 DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptText(script); // 设置一下返回值类型 为Long // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型, // 那么返回字符串与0 会有发生错误。 redisScript.setResultType(Long.class); // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。 redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid); } else { // 其他线程等待 try { // 睡眠 Thread.sleep(1000); // 睡醒了之后,调用方法。 testLockLua(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` > 总结 > > 确保锁的实现要同时满足以下四个条件 > > 1互斥性,任意时刻,只有一个客户端有锁 > > 2不会发生死锁,即使有一个客户端在有锁的情况下崩溃而没有主动解锁,也能保证其他客户端能加锁 > > 3解铃还须系铃人,加锁和解锁必须是同一个客户端,客户端自己不能把别人的锁给解了 > > 4加锁和解锁必须有原子性 ## 十二、@Cacheable的使用 ### (1)使用cache > 1.开启注解@EnableCaching > > ```java > @Configuration > @EnableCaching > public class CachingConfig { > > } > > ``` > > 2.使用@Cacheable、@CachePut、@CacheEvict即可 > > ==注意可能出现序列化问题:可能需要和redis配置文件中一样添加CacheManager的配置== ### (2)使用redis作为缓存 > ==需要完成第六章springboot整合redis的配置== > > @Cacheable`注解用于标注需要缓存的方法。当该方法被调用时,Spring Cache会先检查缓存中是否存在对应的数据。如果存在,则直接返回缓存数据;如果不存在,则执行方法并将结果存入缓存。 > > `@CachePut`注解用于标注需要更新缓存的方法。即使缓存中已经存在数据,该方法仍然会执行,并将结果更新到缓存中。 > > `@CacheEvict`注解用于标注需要清除缓存的方法。当该方法被调用时,Spring Cache会清除对应的缓存数据。 ```java //测试缓存 http://localhost:8080/ayuancity/admins/cache @GetMapping("/cache") @Cacheable(value = "adminlist",key = "'abc'") //注意里面字符要用单引号 public YuancityView testcache(){ System.out.println("我会进来读取数据库"); PageInfo list = AdminsService.selectAdminsList(1,5,new Admins()); return YuancityView.success("查询成功").put("data", list); } ``` ![1718787657860](redis%E5%AD%A6%E4%B9%A0.assets/1718787657860.png) ```java @GetMapping(value = "/{adminId}") @Cacheable(value = "adminlist",key = "#adminId") //使用查询id作为键名 public YuancityView getInfo(@PathVariable("adminId") Long adminId) { return YuancityView.success("查询成功").put("data", AdminsService.selectAdminsByAdminId(adminId)); } ``` ![1718789296532](redis%E5%AD%A6%E4%B9%A0.assets/1718789296532.png)