[TOC] # redis-spring-boot-starter 前面项目中,咱们使用了[db-core]([https://www.kancloud.cn/owenwangwen/open-capacity-platform/1048264](https://www.kancloud.cn/owenwangwen/open-capacity-platform/1048264))为整个项目提供通用的数据库处理, db-core提供了druid数据源,mybatis的操作,也支持了redis的操作,功能不单一,将代码分离,自定义redis-spring-boot-starter ## 功能 * lettuce连接池 * redis操作工具类 * geohash 经纬度定位 * redisson分布式锁 ## 代码分析 * 代码一览 ![](https://img.kancloud.cn/1e/20/1e204bc6b960a331c66f52b5cdca4064_1629x704.png) * RedisAutoConfig ![](https://img.kancloud.cn/5b/b2/5bb2e5b571c09be61c7abd11eb3fb2f0_1749x833.png) 回顾db-spring-boot-starter的章节 * @EnableAutoConfiguration 作用:从classpath中搜索所有META-INF/spring.factories配置文件 并且其中org.springframework.boot.autoconfigure.EnableAutoConfiguration key对应的配置项加载到spring容器 ## 序列化 ### redis序列化配置 ![](https://img.kancloud.cn/d0/ba/d0bae96e680ae6fa2908175ede96f979_509x375.png) ### oauth序列化配置 ![](https://img.kancloud.cn/0b/3e/0b3e61a7c9cc283adc99d243e401ce76_534x726.png) ### 测试脚本 原生序列化方案[序列化100000次]耗时:14389ms size:=1276000000 原生序列化方案[反序列化100000次]耗时:16298ms size:=1276000000 hessian序列化方案[序列化100000次]耗时:10479ms size:=792000000 hessian序列化方案[反序列化100000次]耗时:5480ms size:=792000000 Kryo序列化方案[序列化100000次]耗时:9330ms size:=1013000000 Kryo序列化方案[反序列化100000次]耗时:8582ms size:=1013000000 ``` public static void main(String[] args) { Map map = new HashMap(); for(int i = 0 ; i <=100 ; i++) { map.put("a"+i, "a"+i); } Serializer jdkSerializer = SerializerManager.getSerializer(SerializerManager.JDK) ; Serializer kryoObjectSerializer = SerializerManager.getSerializer(SerializerManager.KRYO) ; Serializer hessianSerializer = SerializerManager.getSerializer(SerializerManager.HESSIAN2) ; long size = 0; long time1 = System.currentTimeMillis(); byte[] jdkserialize = null ; byte[] redisserialize = null ; byte[] kryoserialize = null ; for (int i = 0; i < 1000000; i++) { jdkserialize =jdkSerializer.serialize(map); size += jdkserialize.length; } System.out.println("原生序列化方案[序列化100000次]耗时:" + (System.currentTimeMillis() - time1) + "ms size:=" + size); long time2 = System.currentTimeMillis(); for (int i = 0; i < 1000000; i++) { Map aa =(Map) jdkSerializer.deserialize(jdkserialize) ; } System.out.println("原生序列化方案[反序列化100000次]耗时:" + (System.currentTimeMillis() - time2) + "ms size:=" + size); long time3 = System.currentTimeMillis(); size = 0; for (int i = 0; i < 1000000; i++) { redisserialize =hessianSerializer.serialize(map); size += redisserialize.length; } System.out.println("hessian序列化方案[序列化100000次]耗时:" + (System.currentTimeMillis() - time3) + "ms size:=" + size); long time4 = System.currentTimeMillis(); for (int i = 0; i < 1000000; i++) { Map aa =(Map) hessianSerializer.deserialize(redisserialize) ; } System.out.println("hessian序列化方案[反序列化100000次]耗时:" + (System.currentTimeMillis() - time4) + "ms size:=" + size); long time5 = System.currentTimeMillis(); size = 0; for (int i = 0; i < 1000000; i++) { kryoserialize =kryoObjectSerializer.serialize(map); size += kryoserialize.length; } System.out.println("Kryo序列化方案[序列化100000次]耗时:" + (System.currentTimeMillis() - time5) + "ms size:=" + size); long time6 = System.currentTimeMillis(); for (int i = 0; i < 1000000; i++) { Map aa =(Map) kryoObjectSerializer.deserialize(kryoserialize) ; } System.out.println("Kryo序列化方案[反序列化100000次]耗时:" + (System.currentTimeMillis() - time6) + "ms size:=" + size); } ``` ## redis cluster中的代码优化 项目中 3主3从 redis集群出现单节点宕机,造成master迁移,但是发现应用无法正常连接redis ,分析了代码,发现默认Lettuce是不会刷新拓扑io.lettuce.core.cluster.models.partitions.Partitions#slotCache,最终造成槽点查找节点依旧找到老的节点,自然访问不了了。 ![](https://img.kancloud.cn/a1/00/a1005083f3cb1daa2df0017143559843_1711x668.png) ## geohash 经纬度定位 ~~~ /** * 添加经纬度信息 map.put("北京" ,new Point(116.405285 ,39.904989)) //redis 命令:geoadd * cityGeo 116.405285 39.904989 "北京" */ public Long addGeoPoint(String key, Map<Object, Point> map) { return redisTemplate.opsForGeo().add(key, map); } /** * 查找指定key的经纬度信息 redis命令:geopos cityGeo 北京 * * @param key * @param member * @return */ public Point geoGetPoint(String key, String member) { List<Point> lists = redisTemplate.opsForGeo().position(key, member); return lists.get(0); } /** * 返回两个地方的距离,可以指定单位 redis命令:geodist cityGeo 北京 上海 * * @param key * @param srcMember * @param targetMember * @return */ public Distance geoDistance(String key, String srcMember, String targetMember) { Distance distance = redisTemplate.opsForGeo().distance(key, srcMember, targetMember, Metrics.KILOMETERS); return distance; } /** * 根据指定的地点查询半径在指定范围内的位置 redis命令:georadiusbymember cityGeo 北京 100 km WITHDIST * WITHCOORD ASC COUNT 5 * * @param key * @param member * @param distance * @return */ public GeoResults geoRadiusByMember(String key, String member, double distance) { return redisTemplate.opsForGeo().radius(key, member, new Distance(distance, Metrics.KILOMETERS), RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates() .sortAscending()); } /** * 根据给定的经纬度,返回半径不超过指定距离的元素 redis命令:georadius cityGeo 116.405285 39.904989 * 100 km WITHDIST WITHCOORD ASC COUNT 5 * * @param key * @param circle * @param distance * @return */ public GeoResults geoRadiusByCircle(String key, Circle circle, double distance) { return redisTemplate.opsForGeo().radius(key, circle, new Distance(distance, Metrics.KILOMETERS), RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates() .sortAscending()); } //删除元素 public Long deleteGeoMember(String key, String member) { return redisTemplate.boundZSetOps(key).remove(member); } ~~~ ## 分布式锁redssion ### 集成方式 ![](https://img.kancloud.cn/2d/60/2d6097bd35c5ccb4c2bb9ec34d3cef75_1610x600.png) ### 大致使用 ![](https://img.kancloud.cn/18/d5/18d5a5e294b767fbd1330e3b19f20f16_1366x494.png) ### 代码分析 * 获取锁 ![](https://img.kancloud.cn/5c/d1/5cd16cf63729b7c861e4b8f764d7d746_1218x257.png) > 调用getLock()方法后实际返回一个RedissonLock对象 * 加锁 ![](https://img.kancloud.cn/b7/76/b77653f1bb4d644f77d8172d71e6c22d_1263x810.png) ![](https://img.kancloud.cn/a5/4a/a54ab6fa5c42be0d670d3d9ef545442c_1201x805.png) > 在RedissonLock对象的lock()方法主要调用tryAcquire()方法,由于leaseTime == -1,于是走tryLockInnerAsync()方法, * 加锁细节 ![](https://img.kancloud.cn/57/58/5758163e561c3b3032b836fd9a5f58ff_1118x366.png) > 结合上面的参数声明,我们可以知道,这里KEYS\[1\]就是getName(),ARGV\[2\]是getLockName(threadId),假设前面获取锁时传的name是“anyLock”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是85b196ce-e6f2-42ff-b3d7-6615b6748b5d:65那么KEYS\[1\]=anyLock,ARGV\[2\]=85b196ce-e6f2-42ff-b3d7-6615b6748b5d:Thread-1 ,因此,这段脚本的意思是1、判断有没有一个叫“anyLock”的key2、如果没有,则在其下设置一个字段为“85b196ce-e6f2-42ff-b3d7-6615b6748b5d:Thread-1”,值为“1”的键值对 ,并设置它的过期时间3、如果存在,则进一步判断“85b196ce-e6f2-42ff-b3d7-6615b6748b5d:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间4、返回“anyLock”的生存时间(毫秒) * 加锁redis结构 ![](https://img.kancloud.cn/12/b2/12b2c938d03de90b5b38ff12c467089c_1253x220.png) > 这里用的数据结构是hash,hash的结构是: key  字段1  值1 字段2  值2  。。。用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数) * 解锁 ![](https://img.kancloud.cn/14/de/14dee1fe94bd01a114c6235c208022be_1210x665.png) > 我们还是假设name=anyLock,假设线程ID是Thread-1,同理,我们可以知道KEYS\[1\]是getName(),即KEYS\[1\]=anyLock,KEYS\[2\]是getChannelName(),即KEYS\[2\]=redisson\_lock\_\_channel:{anyLock},ARGV\[1\]是LockPubSub.unlockMessage,即ARGV\[1\]=0,ARGV\[2\]是生存时间,ARGV\[3\]是getLockName(threadId),即ARGV\[3\]=85b196ce-e6f2-42ff-b3d7-6615b6748b5d:Thread-1,因此,上面脚本的意思是:1、判断是否存在一个叫“anyLock”的key2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1。3、如果存在,进一步判断字段85b196ce-e6f2-42ff-b3d7-6615b6748b5d:Thread-1是否存在。4、若字段不存在,返回空,若字段存在,则字段值减1,5、若减完以后,字段值仍大于0,则返回0。6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了 * 等待 ``` private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); } ``` > 这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞 * 总结 ![](https://img.kancloud.cn/0c/e5/0ce569c7fef58d1f9752ba39979f0a72_1740x788.png) ![](https://img.kancloud.cn/7e/fb/7efb414cf1715a4aa609b11ee3924170_607x384.png) ## Cache Aside Pattern * 读的时候,先读缓存,缓存没有的话,那么就读数据库,然后取出数据后放入缓存,同时返回响应 ![](https://img.kancloud.cn/29/c6/29c6bf333087ad69dedeb0a657df5966_1827x709.png) * 更新的时候,先删除缓存,然后再更新数据库