一.固定大小缓存实现 缓存接口定义 为了兼容 Map,我们定义缓存接口继承自 Map 接口。
1 2 3 4 5 public interface ICache <K , V > extends Map <K , V > {}
核心实现 我们主要看一下 put 时的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public V put (K key, V value) { CacheEvictContext<K,V> context = new CacheEvictContext<>(); context.key(key).size(sizeLimit).cache(this ); cacheEvict.evict(context); if (isSizeLimit()) { throw new CacheRuntimeException("当前队列已满,数据添加失败!" ); } return map.put(key, value); }
这里我们可以让用户动态指定大小,但是指定大小肯就要有对应的淘汰策略。
否则,固定大小的 map 肯定无法放入元素。
淘汰策略 淘汰策略可以有多种,比如 LRU/LFU/FIFO 等等,我们此处实现一个最基本的 FIFO。
所有实现以接口的方式实现,便于后期灵活替换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CacheEvictFIFO <K ,V > implements ICacheEvict <K ,V > { private Queue<K> queue = new LinkedList<>(); @Override public void evict (ICacheEvictContext<K, V> context) { final ICache<K,V> cache = context.cache(); if (cache.size() >= context.size()) { K evictKey = queue.remove(); cache.remove(evictKey); } final K key = context.key(); queue.add(key); } }
FIFO 比较简单,我们使用一个队列,存储每一次放入的元素,当队列超过最大限制时,删除最早的元素。
引导类 为了便于用户使用,我们实现类似于 guava 的引导类。
所有参数都提供默认值,使用 fluent 流式写法,提升用户体验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public final class CacheBs <K ,V > { private CacheBs () {} public static <K,V> CacheBs<K,V> newInstance () { return new CacheBs<>(); } private Map<K,V> map = new HashMap<>(); private int size = Integer.MAX_VALUE; private ICacheEvict<K,V> evict = CacheEvicts.fifo(); public CacheBs<K, V> map (Map<K, V> map) { ArgUtil.notNull(map, "map" ); this .map = map; return this ; } public CacheBs<K, V> size (int size) { ArgUtil.notNegative(size, "size" ); this .size = size; return this ; } public CacheBs<K, V> evict (ICacheEvict<K, V> evict) { this .evict = evict; return this ; } public ICache<K,V> build () { CacheContext<K,V> context = new CacheContext<>(); context.cacheEvict(evict); context.map(map); context.size(size); return new Cache<>(context); } }
测试使用 1 2 3 4 5 6 7 8 9 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(2 ) .build(); cache.put("1" , "1" ); cache.put("2" , "2" ); cache.put("3" , "3" ); cache.put("4" , "4" ); Assert.assertEquals(2 , cache.size()); System.out.println(cache.keySet());
默认为先进先出的策略,此时输出 keys,内容如下:
二.LRU缓存淘汰策略 上面默认使用FIFO淘汰策略即先进先淘汰,下我们来开发LRU缓存淘汰策略即淘汰最近最少使用
LRU基本原理 1.LRU 是什么 LRU 是由 Least Recently Used 的首字母组成,表示最近最少使用的含义,一般使用在对象淘汰算法上。
也是比较常见的一种淘汰算法。
其核心思想是如果数据最近被访问过,那么将来被访问的几率也更高 。
2.连续性 在计算机科学中,有一个指导准则:连续性准则。
时间连续性:对于信息的访问,最近被访问过,被再次访问的可能性会很高。缓存就是基于这个理念进行数据淘汰的。
空间连续性:对于磁盘信息的访问,将很有可能访问连续的空间信息。所以会有 page 预取来提升性能。
3.实现步骤
新数据插入到链表头部;
每当缓存命中(即缓存数据被访问),则将数据移到链表头部;
当链表满的时候,将链表尾部的数据丢弃。
其实比较简单,比起 FIFO 的队列,我们引入一个链表实现即可。
5.LRU实现步骤疑点 我们针对上面的 3 句话,逐句考虑一下,看看有没有值得优化点或者一些坑。
如何判断是新数据?
新数据插入到链表头部;
我们使用的是链表。
判断新数据最简单的方法就是遍历是否存在,对于链表,这是一个 O(n) 的时间复杂度。
其实性能还是比较差的。
当然也可以考虑空间换时间,比如引入一个 set 之类的,不过这样对空间的压力会加倍。
什么是缓存命中
每当缓存命中(即缓存数据被访问),则将数据移到链表头部;
put(key,value) 的情况,就是新元素。如果已有这个元素,可以先删除,再加入,参考上面的处理。
get(key) 的情况,对于元素访问,删除已有的元素,将新元素放在头部。
remove(key) 移除一个元素,直接删除已有元素。
keySet() valueSet() entrySet() 这些属于无差别访问,我们不对队列做调整。
移除
当链表满的时候,将链表尾部的数据丢弃。
链表满只有一种场景,那就是添加元素的时候,也就是执行 put(key, value) 的时候。
直接删除对应的 key 即可。
Java 代码实现 1.接口定义 和 FIFO 的接口保持一致,调用地方也不变
为了后续 LRU/LFU 实现,新增 remove/update 两个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface ICacheEvict <K , V > { boolean evict (final ICacheEvictContext<K, V> context) ; void update (final K key) ; void remove (final K key) ; }
2.LRU 实现 直接基于 LinkedList 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class CacheEvictLRU <K ,V > implements ICacheEvict <K ,V > { private static final Log log = LogFactory.getLog(CacheEvictLRU.class); private final List<K> list = new LinkedList<>(); @Override public boolean evict (ICacheEvictContext<K, V> context) { boolean result = false ; final ICache<K,V> cache = context.cache(); if (cache.size() >= context.size()) { K evictKey = list.get(list.size()-1 ); cache.remove(evictKey); result = true ; } return result; } @Override public void update (final K key) { this .list.remove(key); this .list.add(0 , key); } @Override public void remove (final K key) { this .list.remove(key); } }
实现比较简单,相对 FIFO 多了三个方法:
update():我们做一点简化,认为只要是访问,就是删除,然后插入到队首。
remove():删除就是直接删除。
这三个方法是用来更新最近使用情况的。
那什么时候调用呢?
3.注解属性 为了保证核心流程,我们基于注解实现。
添加属性:
1 2 3 4 5 6 7 boolean evict () default false ;
4.注解使用 有哪些方法需要使用?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override @CacheInterceptor(refresh = true, evict = true) public boolean containsKey (Object key) { return map.containsKey(key); } @Override @CacheInterceptor(evict = true) @SuppressWarnings("unchecked") public V get (Object key) { K genericKey = (K) key; this .expire.refreshExpire(Collections.singletonList(genericKey)); return map.get(key); } @Override @CacheInterceptor(aof = true, evict = true) public V put (K key, V value) { } @Override @CacheInterceptor(aof = true, evict = true) public V remove (Object key) { return map.remove(key); }
5.注解驱除拦截器实现 执行顺序:放在方法之后更新,不然每次当前操作的 key 都会被放在最前面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CacheInterceptorEvict <K ,V > implements ICacheInterceptor <K , V > { private static final Log log = LogFactory.getLog(CacheInterceptorEvict.class); @Override public void before (ICacheInterceptorContext<K,V> context) { } @Override @SuppressWarnings("all") public void after (ICacheInterceptorContext<K,V> context) { ICacheEvict<K,V> evict = context.cache().evict(); Method method = context.method(); final K key = (K) context.params()[0 ]; if ("remove" .equals(method.getName())) { evict.remove(key); } else { evict.update(key); } } }
我们只对 remove 方法做下特判,其他方法都使用 update 更新信息。
参数直接取第一个参数。
测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(3 ) .evict(CacheEvicts.<String, String>lru()) .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet());
通过 removeListener 日志也可以看到 B 被移除了:
1 [DEBUG] [2020 -10 -02 21 :33 :44.578 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key:
三.LRU缓存策略的优化 数据结构选择 1.基于数组 方案:为每一个数据附加一个额外的属性——时间戳,当每一次访问数据时,更新该数据的时间戳至当前时间。
当数据空间已满后,则扫描整个数组,淘汰时间戳最小的数据。
不足:维护时间戳需要耗费额外的空间,淘汰数据时需要扫描整个数组。
这个时间复杂度太差,空间复杂度也不好。
2.基于长度有限的双向链表 方案:访问一个数据时,当数据不在链表中,则将数据插入至链表头部,如果在链表中,则将该数据移至链表头部。当数据空间已满后,则淘汰链表最末尾的数据。
不足:插入数据或取数据时,需要扫描整个链表。
这个就是我们上一节实现的方式,缺点还是很明显,每次确认元素是否存在,都要消耗 O(n) 的时间复杂度去查询。
3.基于双向链表和哈希表 方案:为了改进上面需要扫描链表的缺陷,配合哈希表,将数据和链表中的节点形成映射,将插入操作和读取操作的时间复杂度从O(N)降至O(1)
缺点:这个使我们上一节提到的优化思路,不过还是有缺点的,那就是空间复杂度翻倍。
4.数据结构的选择总结 (1)基于数组的实现
这里不建议选择 array 或者 ArrayList,因为读取的时间复杂度为 O(1),但是更新相对是比较慢的,虽然 jdk 使用的是 System.arrayCopy。
(2)基于双向链表的实现
如果我们选择链表,HashMap 中还是不能简单的存储 key, 和对应的下标。
因为链表的遍历,实际上还是 O(n) 的,双向链表理论上可以优化一半,但是这并不是我们想要的 O(1) 效果。
(3)基于双向链表 + Map实现
双向链表我们保持不变。
Map 中 key 对应的值我们放双向链表的节点信息。
那实现方式就变成了实现一个双向链表。
基于自定义双向链表实现 1.节点定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class DoubleListNode <K ,V > { private K key; private V value; private DoubleListNode<K,V> pre; private DoubleListNode<K,V> next; }
2.核心代码实现 我们保持和原来的接口不变,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 public class CacheEvictLruDoubleListMap <K ,V > extends AbstractCacheEvict <K ,V > { private static final Log log = LogFactory.getLog(CacheEvictLruDoubleListMap.class); private DoubleListNode<K,V> head; private DoubleListNode<K,V> tail; private Map<K, DoubleListNode<K,V>> indexMap; public CacheEvictLruDoubleListMap () { this .indexMap = new HashMap<>(); this .head = new DoubleListNode<>(); this .tail = new DoubleListNode<>(); this .head.next(this .tail); this .tail.pre(this .head); } @Override protected ICacheEntry<K, V> doEvict (ICacheEvictContext<K, V> context) { ICacheEntry<K, V> result = null ; final ICache<K,V> cache = context.cache(); if (cache.size() >= context.size()) { DoubleListNode<K,V> tailPre = this .tail.pre(); if (tailPre == this .head) { log.error("当前列表为空,无法进行删除" ); throw new CacheRuntimeException("不可删除头结点!" ); } K evictKey = tailPre.key(); V evictValue = cache.remove(evictKey); result = new CacheEntry<>(evictKey, evictValue); } return result; } @Override public void update (final K key) { this .remove(key); DoubleListNode<K,V> newNode = new DoubleListNode<>(); newNode.key(key); DoubleListNode<K,V> next = this .head.next(); this .head.next(newNode); newNode.pre(this .head); next.pre(newNode); newNode.next(next); indexMap.put(key, newNode); } @Override public void remove (final K key) { DoubleListNode<K,V> node = indexMap.get(key); if (ObjectUtil.isNull(node)) { return ; } DoubleListNode<K,V> pre = node.pre(); DoubleListNode<K,V> next = node.next(); pre.next(next); next.pre(pre); this .indexMap.remove(key); } }
实现起来不难,就是一个简易版本的双向列表。
只是获取节点的时候,借助了一下 map,让时间复杂度降低为 O(1)。
3.测试结果 我们验证一下自己的实现:
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(3 ) .evict(CacheEvicts.<String, String>lruDoubleListMap()) .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet());
日志
1 2 [DEBUG] [2020 -10 -03 09:37 :41.007 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [D, A, C]
因为我们访问过一次 A,所以 B 已经变成最少被访问的元素。
基于 LinkedHashMap 实现 实际上,LinkedHashMap 本身就是对于 list 和 hashMap 的一种结合的数据结构,我们可以直接使用 jdk 中 LinkedHashMap 去实现。
1.直接实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class LRUCache extends LinkedHashMap { private int capacity; public LRUCache (int capacity) { super (16 , 0.75f , true ); this .capacity = capacity; } @Override protected boolean removeEldestEntry (Map.Entry eldest) { return super .size() >= capacity; } }
默认LinkedHashMap并不会淘汰数据,所以我们重写了它的removeEldestEntry()方法,当数据数量达到预设上限后,淘汰数据,accessOrder设为true意为按照访问的顺序排序。
整个实现的代码量并不大,主要都是应用LinkedHashMap的特性。
2.简单改造 我们对这个方法简单改造下,让其适应我们定义的接口。
3.测试结果 测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(3 ) .evict(CacheEvicts.<String, String>lruLinkedHashMap()) .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet());
日志
1 2 [DEBUG] [2020 -10 -03 10 :20 :57.842 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [D, A, C]
LRU扩展算法 当存在热点数据时,LRU的效率很好,但偶发性的、周期性的批量操作会导致LRU命中率急剧下降,缓存污染情况比较严重。
1. LRU-K LRU-K中的K代表最近使用的次数,因此LRU可以认为是LRU-1。
LRU-K的主要目的是为了解决LRU算法“缓存污染”的问题,其核心思想是将“最近使用过1次”的判断标准扩展为“最近使用过K次”。
相比LRU,LRU-K需要多维护一个队列,用于记录所有缓存数据被访问的历史。只有当数据的访问次数达到K次的时候,才将数据放入缓存。
当需要淘汰数据时,LRU-K会淘汰第K次访问时间距当前时间最大的数据。
数据第一次被访问时,加入到历史访问列表,如果数据在访问历史列表中没有达到K次访问,则按照一定的规则(FIFO,LRU)淘汰;
当访问历史队列中的数据访问次数达到K次后,将数据索引从历史队列中删除,将数据移到缓存队列中,并缓存数据,缓存队列重新按照时间排序;
缓存数据队列中被再次访问后,重新排序,需要淘汰数据时,淘汰缓存队列中排在末尾的数据,即“淘汰倒数K次访问离现在最久的数据”。
LRU-K具有LRU的优点,同时还能避免LRU的缺点,实际应用中LRU-2是综合最优的选择。
由于LRU-K还需要记录那些被访问过、但还没有放入缓存的对象,因此内存消耗会比LRU要多。
2. two queue Two queues(以下使用2Q代替)算法类似于LRU-2,不同点在于2Q将LRU-2算法中的访问历史队列(注意这不是缓存数据的)改为一个FIFO缓存队列,即:2Q算法有两个缓存队列,一个是FIFO队列,一个是LRU队列。
当数据第一次访问时,2Q算法将数据缓存在FIFO队列里面,当数据第二次被访问时,则将数据从FIFO队列移到LRU队列里面,两个队列各自按照自己的方法淘汰数据。
新访问的数据插入到FIFO队列中,如果数据在FIFO队列中一直没有被再次访问,则最终按照FIFO规则淘汰;
如果数据在FIFO队列中再次被访问到,则将数据移到LRU队列头部,如果数据在LRU队列中再次被访问,则将数据移动LRU队列头部,LRU队列淘汰末尾的数据。
3. Multi Queue(MQ) MQ算法根据访问频率将数据划分为多个队列,不同的队列具有不同的访问优先级,其核心思想是:优先缓存访问次数多的数据 。
详细的算法结构图如下,Q0,Q1….Qk代表不同的优先级队列,Q-history代表从缓存中淘汰数据,但记录了数据的索引和引用次数的队列:
新插入的数据放入Q0,每个队列按照LRU进行管理,当数据的访问次数达到一定次数,需要提升优先级时,将数据从当前队列中删除,加入到高一级队列的头部;为了防止高优先级数据永远不会被淘汰,当数据在指定的时间里没有被访问时,需要降低优先级,将数据从当前队列删除,加入到低一级的队列头部;需要淘汰数据时,从最低一级队列开始按照LRU淘汰,每个队列淘汰数据时,将数据从缓存中删除,将数据索引加入Q-history头部。
如果数据在Q-history中被重新访问,则重新计算其优先级,移到目标队列头部。
Q-history按照LRU淘汰数据的索引。
MQ需要维护多个队列,且需要维护每个数据的访问时间,复杂度比LRU高。
4.LRU算法对比
对比点
对比
命中率
LRU-2 > MQ(2) > 2Q > LRU
复杂度
LRU-2 > MQ(2) > 2Q > LRU
代价
LRU-2 > MQ(2) > 2Q > LRU
实际上上面的几个算法,思想上大同小异。
核心目的:解决批量操作导致热点数据失效,缓存被污染的问题。
实现方式:增加一个队列,用来保存只访问一次的数据,然后根据次数不同,放入到 LRU 中。
只访问一次的队列,可以是 FIFO 队列,可以是 LRU,我们来实现一下 2Q 和 LRU-2 两种实现。
2Q算法实现 1.实现思路 实际上就是我们以前的 FIFO + LRU 二者的结合。
2.基本属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class CacheEvictLru2Q <K ,V > extends AbstractCacheEvict <K ,V > { private static final Log log = LogFactory.getLog(CacheEvictLru2Q.class); private static final int LIMIT_QUEUE_SIZE = 1024 ; private Queue<K> firstQueue; private DoubleListNode<K,V> head; private DoubleListNode<K,V> tail; private Map<K, DoubleListNode<K,V>> lruIndexMap; public CacheEvictLru2Q () { this .firstQueue = new LinkedList<>(); this .lruIndexMap = new HashMap<>(); this .head = new DoubleListNode<>(); this .tail = new DoubleListNode<>(); this .head.next(this .tail); this .tail.pre(this .head); } }
3.数据淘汰 数据淘汰的逻辑:
当缓存大小,已经达到最大限制时执行:
(1)优先淘汰 firstQueue 中的数据
(2)如果 firstQueue 中数据为空,则淘汰 lruMap 中的数据信息。
这里有一个假设:我们认为被多次访问的数据,重要性高于被只访问了一次的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override protected ICacheEntry<K, V> doEvict (ICacheEvictContext<K, V> context) { ICacheEntry<K, V> result = null ; final ICache<K,V> cache = context.cache(); if (cache.size() >= context.size()) { K evictKey = null ; if (!firstQueue.isEmpty()) { evictKey = firstQueue.remove(); } else { DoubleListNode<K,V> tailPre = this .tail.pre(); if (tailPre == this .head) { log.error("当前列表为空,无法进行删除" ); throw new CacheRuntimeException("不可删除头结点!" ); } evictKey = tailPre.key(); } V evictValue = cache.remove(evictKey); result = new CacheEntry<>(evictKey, evictValue); } return result; }
4.数据删除 当数据被删除时调用:
这个逻辑和以前类似,只是多了一个 FIFO 队列的移除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void removeKey (final K key) { DoubleListNode<K,V> node = lruIndexMap.get(key); if (ObjectUtil.isNotNull(node)) { DoubleListNode<K,V> pre = node.pre(); DoubleListNode<K,V> next = node.next(); pre.next(next); next.pre(pre); this .lruIndexMap.remove(node.key()); } else { firstQueue.remove(key); } }
5.数据的更新 当数据被访问时,提升数据的优先级。
(1)如果在 lruMap 中,则首先移除,然后放入到头部
(2)如果不在 lruMap 中,但是在 FIFO 队列,则从 FIFO 队列中移除,添加到 LRU map 中。
(3)如果都不在,直接加入到 FIFO 队列中即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public void updateKey (final K key) { DoubleListNode<K,V> node = lruIndexMap.get(key); if (ObjectUtil.isNotNull(node) || firstQueue.contains(key)) { this .removeKey(key); this .addToLruMapHead(key); return ; } firstQueue.add(key); }
这里我想到了一个优化点,限制 firstQueue 的一直增长,因为遍历的时间复杂度为 O(n),所以限制最大的大小为 1024。
如果超过了,则把 FIFO 中的元素先移除掉。
不过只移除 FIFO,不移除 cache,会导致二者的活跃程度不一致;
如果同时移除,但是 cache 的大小还没有满足,可能会导致超出用户的预期,这个可以作为一个优化点,暂时注释掉。
6.测试 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(3 ) .evict(CacheEvicts.<String, String>lru2Q()) .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet());
效果
1 2 [DEBUG] [2020 -10 -03 13 :15 :50.670 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [D, A, C]
LRU-2算法实现 1.实现LRU简介 FIFO 中的缺点还是比较明显的,需要 O(n) 的时间复杂度做遍历。
而且命中率和 LRU-2 比起来还是会差一点。
这里 LRU map 出现了多次,我们为了方便,将 LRU map 简单的封装为一个数据结构。
我们使用双向链表+HashMap 实现一个简单版本的。
2.节点 node 节点和以前一致:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class DoubleListNode <K ,V > { private K key; private V value; private DoubleListNode<K,V> pre; private DoubleListNode<K,V> next; }
3.接口 我们根据自己的需要,暂时定义 3 个最重要的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public interface ILruMap <K ,V > { ICacheEntry<K, V> removeEldest () ; void updateKey (final K key) ; void removeKey (final K key) ; boolean isEmpty () ; boolean contains (final K key) ; }
4.实现 我们基于 DoubleLinkedList + HashMap 实现。
就是把上一节中的实现整理一下即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 public class LruMapDoubleList <K ,V > implements ILruMap <K ,V > { private static final Log log = LogFactory.getLog(LruMapDoubleList.class); private DoubleListNode<K,V> head; private DoubleListNode<K,V> tail; private Map<K, DoubleListNode<K,V>> indexMap; public LruMapDoubleList () { this .indexMap = new HashMap<>(); this .head = new DoubleListNode<>(); this .tail = new DoubleListNode<>(); this .head.next(this .tail); this .tail.pre(this .head); } @Override public ICacheEntry<K, V> removeEldest () { DoubleListNode<K,V> tailPre = this .tail.pre(); if (tailPre == this .head) { log.error("当前列表为空,无法进行删除" ); throw new CacheRuntimeException("不可删除头结点!" ); } K evictKey = tailPre.key(); V evictValue = tailPre.value(); return CacheEntry.of(evictKey, evictValue); } @Override public void updateKey (final K key) { this .removeKey(key); DoubleListNode<K,V> newNode = new DoubleListNode<>(); newNode.key(key); DoubleListNode<K,V> next = this .head.next(); this .head.next(newNode); newNode.pre(this .head); next.pre(newNode); newNode.next(next); indexMap.put(key, newNode); } @Override public void removeKey (final K key) { DoubleListNode<K,V> node = indexMap.get(key); if (ObjectUtil.isNull(node)) { return ; } DoubleListNode<K,V> pre = node.pre(); DoubleListNode<K,V> next = node.next(); pre.next(next); next.pre(pre); this .indexMap.remove(key); } @Override public boolean isEmpty () { return indexMap.isEmpty(); } @Override public boolean contains (K key) { return indexMap.containsKey(key); } }
5.基本属性 LRU 的实现保持不变。我们直接将 FIFO 替换为 LRU map 即可。
为了便于理解,我们将 FIFO 对应为 firstLruMap,用来存放用户只访问了一次的元素。
将原来的 LRU 中存入访问了 2 次及其以上的元素。
其他逻辑和 2Q 保持一致。
定义两个 LRU,用来分别存储访问的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class CacheEvictLru2 <K ,V > extends AbstractCacheEvict <K ,V > { private static final Log log = LogFactory.getLog(CacheEvictLru2.class); private final ILruMap<K,V> firstLruMap; private final ILruMap<K,V> moreLruMap; public CacheEvictLru2 () { this .firstLruMap = new LruMapDoubleList<>(); this .moreLruMap = new LruMapDoubleList<>(); } }
6.淘汰实现 和 lru 2Q 模式类似,这里我们优先淘汰 firstLruMap 中的数据信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override protected ICacheEntry<K, V> doEvict (ICacheEvictContext<K, V> context) { ICacheEntry<K, V> result = null ; final ICache<K,V> cache = context.cache(); if (cache.size() >= context.size()) { ICacheEntry<K,V> evictEntry = null ; if (!firstLruMap.isEmpty()) { evictEntry = firstLruMap.removeEldest(); log.debug("从 firstLruMap 中淘汰数据:{}" , evictEntry); } else { evictEntry = moreLruMap.removeEldest(); log.debug("从 moreLruMap 中淘汰数据:{}" , evictEntry); } final K evictKey = evictEntry.key(); V evictValue = cache.remove(evictKey); result = new CacheEntry<>(evictKey, evictValue); } return result; }
7.删除 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public void removeKey (final K key) { if (moreLruMap.contains(key)) { moreLruMap.removeKey(key); log.debug("key: {} 从 moreLruMap 中移除" , key); } else { firstLruMap.removeKey(key); log.debug("key: {} 从 firstLruMap 中移除" , key); } }
8.更新 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public void updateKey (final K key) { if (moreLruMap.contains(key) || firstLruMap.contains(key)) { this .removeKey(key); moreLruMap.updateKey(key); log.debug("key: {} 多次访问,加入到 moreLruMap 中" , key); } else { firstLruMap.updateKey(key); log.debug("key: {} 为第一次访问,加入到 firstLruMap 中" , key); } }
实际上使用 LRU-2 的代码逻辑反而变得清晰了一些,主要是因为我们把 lruMap 作为独立的数据结构抽离了出去
9.测试 代码
1 2 3 4 5 6 7 8 9 10 11 12 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(3 ) .evict(CacheEvicts.<String, String>lru2Q()) .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet());
日志
为了便于定位分析,源代码实现的时候,加了一点日志。
1 2 3 4 5 6 7 8 9 [DEBUG] [2020 -10 -03 14 :39 :04.966 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 为第一次访问,加入到 firstLruMap 中 [DEBUG] [2020 -10 -03 14 :39 :04.967 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: B 为第一次访问,加入到 firstLruMap 中 [DEBUG] [2020 -10 -03 14 :39 :04.968 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: C 为第一次访问,加入到 firstLruMap 中 [DEBUG] [2020 -10 -03 14 :39 :04.970 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.removeKey] - key: A 从 firstLruMap 中移除 [DEBUG] [2020 -10 -03 14 :39 :04.970 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 多次访问,加入到 moreLruMap 中 [DEBUG] [2020 -10 -03 14 :39 :04.972 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.doEvict] - 从 firstLruMap 中淘汰数据:EvictEntry{key=B, value=null } [DEBUG] [2020 -10 -03 14 :39 :04.974 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [DEBUG] [2020 -10 -03 14 :39 :04.974 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: D 为第一次访问,加入到 firstLruMap 中 [D, A, C]
10.小结 对于 LRU 算法的改进我们主要做了两点:
(1)性能的改进,从 O(N) 优化到 O(1)
(2)批量操作的改进,避免缓存污染
四.过期功能的实现 缓存接口定义 我们首先来定义一下接口。
主要有两个:一个是多久之后过期,一个是在什么时候过期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public interface ICache <K , V > extends Map <K , V > { ICache<K, V> expire (final K key, final long timeInMills) ; ICache<K, V> expireAt (final K key, final long timeInMills) ; }
缓存接口实现 为了便于处理,我们将多久之后过期,进行计算。将两个问题变成同一个问题,在什么时候过期的问题。
核心的代码,主要还是看 cacheExpire 接口。
1 2 3 4 5 6 7 8 9 10 11 @Override public ICache<K, V> expire (K key, long timeInMills) { long expireTime = System.currentTimeMillis() + timeInMills; return this .expireAt(key, expireTime); } @Override public ICache<K, V> expireAt (K key, long timeInMills) { this .cacheExpire.expire(key, timeInMills); return this ; }
缓存过期接口 这里为了便于后期拓展,对于过期的处理定义为接口,便于后期灵活替换
其中 expire(final K key, final long expireAt);
就是我们方法中调用的地方。
refershExpire 属于惰性删除,需要进行刷新时才考虑,我们后面讲解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public interface ICacheExpire <K ,V > { void expire (final K key, final long expireAt) ; void refreshExpire (final Collection<K> keyList) ; }
expire 实现原理 其实过期的实思路也比较简单:我们可以开启一个定时任务,比如 1 秒钟做一次轮训,将过期的信息清空。
1.过期信息的存储 1 2 3 4 5 6 7 8 9 10 11 private final Map<K, Long> expireMap = new HashMap<>();@Override public void expire (K key, long expireAt) { expireMap.put(key, expireAt); }
我们定义一个 map,key 是对应的要过期的信息,value 存储的是过期时间。
2.轮询清理 我们固定 100ms 清理一次,每次最多清理 100 个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static final int LIMIT = 100 ;private final ICache<K,V> cache;private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();public CacheExpire (ICache<K, V> cache) { this .cache = cache; this .init(); } private void init () { EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 100 , 100 , TimeUnit.MILLISECONDS); }
这里定义了一个单线程,用于执行清空任务。
3.清空任务 这个非常简单,遍历过期数据,判断对应的时间,如果已经到期了,则执行清空操作。
为了避免单次执行时间过长,最多只处理 100 条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private class ExpireThread implements Runnable { @Override public void run () { if (MapUtil.isEmpty(expireMap)) { return ; } int count = 0 ; for (Map.Entry<K, Long> entry : expireMap.entrySet()) { if (count >= LIMIT) { return ; } expireKey(entry); count++; } } } private void expireKey (Map.Entry<K, Long> entry) { final K key = entry.getKey(); final Long expireAt = entry.getValue(); long currentTime = System.currentTimeMillis(); if (currentTime >= expireAt) { expireMap.remove(key); cache.remove(key); } }
惰性删除 1.出现的原因 类似于 redis,我们采用定时删除的方案,就有一个问题:可能数据清理的不及时。
那当我们查询时,可能获取到到是脏数据。
于是就有一些人就想了,当我们关心某些数据时,才对数据做对应的删除判断操作,这样压力会小很多。
算是一种折中方案。
2.需要惰性删除的方法 一般就是各种查询方法,比如我们获取 key 对应的值时
1 2 3 4 5 6 7 8 @Override @SuppressWarnings("unchecked") public V get (Object key) { K genericKey = (K) key; this .cacheExpire.refreshExpire(Collections.singletonList(genericKey)); return map.get(key); }
我们在获取之前,先做一次数据的刷新。
3.刷新的实现 实现原理也非常简单,就是一个循环,然后作删除即可。
这里加了一个小的优化:选择数量少的作为外循环。
循环集合的时间复杂度是 O(n), map.get() 的时间复杂度是 O(1);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void refreshExpire (Collection<K> keyList) { if (CollectionUtil.isEmpty(keyList)) { return ; } if (keyList.size() <= expireMap.size()) { for (K key : keyList) { expireKey(key); } } else { for (Map.Entry<K, Long> entry : expireMap.entrySet()) { this .expireKey(entry); } } }
测试 上面的代码写完之后,我们就可以验证一下了。
1 2 3 4 5 6 7 8 9 10 11 12 13 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(3 ) .build(); cache.put("1" , "1" ); cache.put("2" , "2" ); cache.expire("1" , 10 ); Assert.assertEquals(2 , cache.size()); TimeUnit.MILLISECONDS.sleep(50 ); Assert.assertEquals(1 , cache.size()); System.out.println(cache.keySet());
结果也符合我们的预期
五.过期功能的优化 上面的过期功能的实现中存在两个问题:
(1)keys 的选择不够随机,可能会导致每次循环 100 个结束时,真正需要过期的没有被遍历到
(2)keys 的遍历可能大部分都是无效的
所以下面以过期时间为维度对过期功能进行优化
基于时间的遍历 1.思路 我们换一种思路,让过期的时间做 key,相同时间的需要过期的信息放在一个列表中,作为 value
然后对过期时间排序,轮询的时候就可以快速判断出是否有过期的信息了。
我们每次 put 放入过期元素时,根据过期时间对元素进行排序,相同的过期时间的 Keys 放在一起。
优点:定时遍历的时候,如果时间不到当前时间,就可以直接返回了,大大降低无效遍历。
缺点:考虑到惰性删除问题,还是需要存储以删除信息作为 key 的 map 关系,这样内存基本翻倍。
2.基本属性定义 我们这里使用 TreeMap
帮助我们进行过期时间的排序,这个集合后续有时间可以详细讲解了,我大概看了下 jdk1.8 的源码,主要是通过红黑树实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class CacheExpireSort <K ,V > implements ICacheExpire <K ,V > { private static final int LIMIT = 100 ; private final Map<Long, List<K>> sortMap = new TreeMap<>(new Comparator<Long>() { @Override public int compare (Long o1, Long o2) { return (int ) (o1 - o2); } }); private final Map<K, Long> expireMap = new HashMap<>(); private final ICache<K,V> cache; }
3.放入元素时 每次存入新元素时,同时放入 sortMap 和 expireMap。
1 2 3 4 5 6 7 8 9 10 11 @Override public void expire (K key, long expireAt) { List<K> keys = sortMap.get(expireAt); if (keys == null ) { keys = new ArrayList<>(); } keys.add(key); sortMap.put(expireAt, keys); expireMap.put(key, expireAt); }
定时任务的执行 1.定义 我们定义一个定时任务,100ms 执行一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();public CacheExpireSort (ICache<K, V> cache) { this .cache = cache; this .init(); } private void init () { EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 100 , 100 , TimeUnit.MILLISECONDS); }
2.执行任务 实现源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private class ExpireThread implements Runnable { @Override public void run () { if (MapUtil.isEmpty(sortMap)) { return ; } int count = 0 ; for (Map.Entry<Long, List<K>> entry : sortMap.entrySet()) { final Long expireAt = entry.getKey(); List<K> expireKeys = entry.getValue(); if (CollectionUtil.isEmpty(expireKeys)) { sortMap.remove(expireAt); continue ; } if (count >= LIMIT) { return ; } long currentTime = System.currentTimeMillis(); if (currentTime >= expireAt) { Iterator<K> iterator = expireKeys.iterator(); while (iterator.hasNext()) { K key = iterator.next(); iterator.remove(); expireMap.remove(key); cache.remove(key); count++; } } else { return ; } } } }
这里直接遍历 sortMap,对应的 key 就是过期时间,然后和当前时间对比即可。
删除的时候,需要删除 expireMap/sortMap/cache。
惰性删除刷新 惰性删除刷新时,就会用到 expireMap。
因为有时候刷新的 key 就一个,如果没有 expireMap 映射关系,可能要把 sortMap 全部遍历一遍才能找到对应的过期时间。
就是一个时间复杂度与空间复杂度衡量的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public void refreshExpire (Collection<K> keyList) { if (CollectionUtil.isEmpty(keyList)) { return ; } final int expireSize = expireMap.size(); if (expireSize <= keyList.size()) { for (Map.Entry<K,Long> entry : expireMap.entrySet()) { K key = entry.getKey(); this .removeExpireKey(key); } } else { for (K key : keyList) { this .removeExpireKey(key); } } } private void removeExpireKey (final K key) { Long expireTime = expireMap.get(key); if (expireTime != null ) { final long currentTime = System.currentTimeMillis(); if (currentTime >= expireTime) { expireMap.remove(key); List<K> expireKeys = sortMap.get(expireTime); expireKeys.remove(key); sortMap.put(expireTime, expireKeys); } } }
六.RBD持久化功能的实现 缓存的持久化功能分为以下两个部分:
Cache 的内容持久化到文件或者数据库
初始化的时候加载持久化数据
持久化操作 1.持久化操作接口 为了便于灵活替换,我们定义一个持久化的接口。
1 2 3 4 5 6 7 8 9 public interface ICachePersist <K , V > { void persist (final ICache<K, V> cache) ; }
2.持久化操作接口实现 我们实现一个最简单的基于 json 的持久化,当然后期可以添加类似于 AOF 的持久化模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class CachePersistDbJson <K ,V > implements ICachePersist <K ,V > { private final String dbPath; public CachePersistDbJson (String dbPath) { this .dbPath = dbPath; } @Override public void persist (ICache<K, V> cache) { Set<Map.Entry<K,V>> entrySet = cache.entrySet(); FileUtil.createFile(dbPath); FileUtil.truncate(dbPath); for (Map.Entry<K,V> entry : entrySet) { K key = entry.getKey(); Long expireTime = cache.expire().expireTime(key); PersistEntry<K,V> persistEntry = new PersistEntry<>(); persistEntry.setKey(key); persistEntry.setValue(entry.getValue()); persistEntry.setExpire(expireTime); String line = JSON.toJSONString(persistEntry); FileUtil.write(dbPath, line, StandardOpenOption.APPEND); } } }
3.持久化操作的触发 上面定义好了一种持久化的策略,但是没有提供对应的触发方式。
我们就采用对用户透明的设计方式:定时执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class InnerCachePersist <K ,V > { private static final Log log = LogFactory.getLog(InnerCachePersist.class); private final ICache<K,V> cache; private final ICachePersist<K,V> persist; private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); public InnerCachePersist (ICache<K, V> cache, ICachePersist<K, V> persist) { this .cache = cache; this .persist = persist; this .init(); } private void init () { EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { log.info("开始持久化缓存信息" ); persist.persist(cache); log.info("完成持久化缓存信息" ); } catch (Exception exception) { log.error("文件持久化异常" , exception); } } }, 0 , 10 , TimeUnit.MINUTES); } }
定时执行的时间间隔为 10min。
4.测试 我们只需要在创建 Cache 时,指定我们的持久化策略即可。
1 2 3 4 5 6 ICache<String, String> cache = CacheBs.<String,String>newInstance() .load(new MyCacheLoad()) .persist(CachePersists.<String, String>dbJson("1.rdb" )) .build(); Assert.assertEquals(2 , cache.size()); TimeUnit.SECONDS.sleep(5 );
为了确保文件持久化完成,我们沉睡了一会儿。
测试结果如下
生成1.rdb文件内容如下:
1 2 {"key" :"2" ,"value" :"2" } {"key" :"1" ,"value" :"1" }
缓存数据的加载 1.缓存初始化接口 缓存初始化即从持久化中提取数据到缓存内存中
为了便于后期拓展,定义 ICacheLoad 接口。
1 2 3 4 5 6 7 8 9 public interface ICacheLoad <K , V > { void load (final ICache<K,V> cache) ; }
2.缓存初始化接口实现 我们只需要实现以下对应的加载即可,解析文件,然后初始化 cache。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class CacheLoadDbJson <K ,V > implements ICacheLoad <K ,V > { private static final Log log = LogFactory.getLog(CacheLoadDbJson.class); private final String dbPath; public CacheLoadDbJson (String dbPath) { this .dbPath = dbPath; } @Override public void load (ICache<K, V> cache) { List<String> lines = FileUtil.readAllLines(dbPath); log.info("[load] 开始处理 path: {}" , dbPath); if (CollectionUtil.isEmpty(lines)) { log.info("[load] path: {} 文件内容为空,直接返回" , dbPath); return ; } for (String line : lines) { if (StringUtil.isEmpty(line)) { continue ; } PersistEntry<K,V> entry = JSON.parseObject(line, PersistEntry.class); K key = entry.getKey(); V value = entry.getValue(); Long expire = entry.getExpire(); cache.put(key, value); if (ObjectUtil.isNotNull(expire)) { cache.expireAt(key, expire); } } } }
然后在初始化时使用即可。
七.AOF持久化功能的实现 Redis AOF 解析 1.为什么选择 AOF? AOF 模式的性能特别好 :
用过 kafka 的同学肯定知道,kafka 也用到了顺序写这个特性。
顺序写添加文件内容,避免了文件 IO 的随机写问题,性能基本可以和内存媲美。
AOF 的实时性更好 ,这个是相对于 RDB 模式而言的:
我们原来使用 RDB 模式,将缓存内容全部持久化,这个是比较耗时的动作,一般是几分钟持久化一次。
AOF 模式主要是针对修改内容的指令,然后将所有的指令顺序添加到文件中。这样的话,实时性会好很多,可以提升到秒级别,甚至秒级别。可以将AOF模式理解为一个操作流水表
2.AOF 的吞吐量 AOF 模式可以每次操作都进行持久化,但是这样会导致吞吐量大大下降。
提升吞吐量最常用的方式就是批量 ,这个 kafka 中也是类似的,比如我们可以 1s 持久化一次,将 1s 内的操作全部放入 buffer 中。
这里其实就是一个 trade-off 问题,实时性与吞吐量的平衡艺术。
实际业务中,1s 的误差一般都是可以接受的,所以这个也是业界比较认可的方式。
3.AOF 的异步+多线程 kafka 中所有的操作实际上都是异步+回调的方式实现的。
异步+多线程,确实可以提升操作的性能。
当然 redis 6 以前,其实一直是单线程的。那为什么性能依然这么好呢?
其实多线程也有代价,那就是线程上下文的切换是需要耗时的,保持并发的安全问题,也需要加锁,从而降低性能。
所以这里要考虑异步的收益,与付出的耗时是否成正比的问题。
4.AOF 的落盘 我们 AOF 与 RDB 模式,归根结底都是基于操作系统的文件系统做持久化的。
对于开发者而言,可能就是调用一个 api 就实现了,但是实际持久化落盘的动作并不见得就是一步完成的。
文件系统为了提升吞吐量,也会采用类似 buffer 的方式。这忽然有一点俄罗斯套娃的味道。
但是优秀的设计总是相似的,比如说缓存从 cpu 的设计中就有 L1/L2 等等,思路是一致的。
阿里的很多开源技术,都会针对操作系统的落盘做进一步的优化,这个我们后续做深入学习
5.AOF 的缺陷 大道缺一,没有银弹。
AOF 千好万好,和 RDB 对比也存在一个缺陷,那就是指令
AOF注解实现 1.接口 接口和 rdb 的保持一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface ICachePersist <K , V > { void persist (final ICache<K, V> cache) ; }
2.注解定义 为了和耗时统计,刷新等特性保持一致,对于操作类的动作才添加到文件中(append to file)我们也基于注解属性来指定,而不是固定写死在代码中,便于后期拓展调整
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Documented @Inherited @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface CacheInterceptor { boolean aof () default false ; }
我们在原来的 @CacheInterceptor
注解中添加 aof 属性,用于指定是否对操作开启 aof 模式
3.过期操作中启用AOF 类似于 spring 的事务拦截器,我们使用代理类调用 expireAt。
expire 方法就不需要添加 aof 拦截了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override @CacheInterceptor public ICache<K, V> expire (K key, long timeInMills) { long expireTime = System.currentTimeMillis() + timeInMills; Cache<K,V> cachePoxy = (Cache<K, V>) CacheProxy.getProxy(this ); return cachePoxy.expireAt(key, expireTime); } @Override @CacheInterceptor(aof = true) public ICache<K, V> expireAt (K key, long timeInMills) { this .expire.expire(key, timeInMills); return this ; }
4.变更操作中启用AOF 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override @CacheInterceptor(aof = true) public V put (K key, V value) { CacheEvictContext<K,V> context = new CacheEvictContext<>(); context.key(key).size(sizeLimit).cache(this ); boolean evictResult = evict.evict(context); if (evictResult) { ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code()); for (ICacheRemoveListener<K,V> listener : this .removeListeners) { listener.listen(removeListenerContext); } } if (isSizeLimit()) { throw new CacheRuntimeException("当前队列已满,数据添加失败!" ); } return map.put(key, value); } @Override @CacheInterceptor(aof = true) public V remove (Object key) { return map.remove(key); } @Override @CacheInterceptor(aof = true) public void putAll (Map<? extends K, ? extends V> m) { map.putAll(m); } @Override @CacheInterceptor(refresh = true, aof = true) public void clear () { map.clear(); }
AOF 持久化拦截实现 1.持久化对象定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class PersistAofEntry { private Object[] params; private String methodName;( }
这里我们只需要方法名,和参数对象。
暂时实现的简单一些即可
2.持久化拦截器 我们定义拦截器,当 cache 中定义的持久化类为 CachePersistAof
时,将操作的信息放入到 CachePersistAof 的 buffer 列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class CacheInterceptorAof <K ,V > implements ICacheInterceptor <K , V > { private static final Log log = LogFactory.getLog(CacheInterceptorAof.class); @Override public void before (ICacheInterceptorContext<K,V> context) { } @Override public void after (ICacheInterceptorContext<K,V> context) { ICache<K,V> cache = context.cache(); ICachePersist<K,V> persist = cache.persist(); if (persist instanceof CachePersistAof) { CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist; String methodName = context.method().getName(); PersistAofEntry aofEntry = PersistAofEntry.newInstance(); aofEntry.setMethodName(methodName); aofEntry.setParams(context.params()); String json = JSON.toJSONString(aofEntry); log.debug("AOF 开始追加文件内容:{}" , json); cachePersistAof.append(json); log.debug("AOF 完成追加文件内容:{}" , json); } } }
3.拦截器调用 当 AOF 的注解属性为 true 时,调用上述拦截器即可。
这里为了避免浪费,只有当持久化类为 AOF 模式时,才进行调用。
1 2 3 4 5 6 7 8 9 final ICachePersist cachePersist = cache.persist();if (cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) { if (before) { persistInterceptors.before(interceptorContext); } else { persistInterceptors.after(interceptorContext); } }
AOF持久化实现 这里的 AOF 模式和以前的 RDB 持久化类只是不同的模式,实际上二者是相同的接口。
1.接口 这里我们统一定义了不同的持久化类的时间,便于 RDB 与 AOF 不同任务的不同时间间隔触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface ICachePersist <K , V > { void persist (final ICache<K, V> cache) ; long delay () ; long period () ; TimeUnit timeUnit () ; }
2.持久化类实现 实现一个 Buffer 列表,用于每次拦截器直接顺序添加
持久化的实现也比较简单,追加到文件之后,直接清空 buffer 列表即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class CachePersistAof <K ,V > extends CachePersistAdaptor <K ,V > { private static final Log log = LogFactory.getLog(CachePersistAof.class); private final List<String> bufferList = new ArrayList<>(); private final String dbPath; public CachePersistAof (String dbPath) { this .dbPath = dbPath; } @Override public void persist (ICache<K, V> cache) { log.info("开始 AOF 持久化到文件" ); if (!FileUtil.exists(dbPath)) { FileUtil.createFile(dbPath); } FileUtil.append(dbPath, bufferList); bufferList.clear(); log.info("完成 AOF 持久化到文件" ); } @Override public long delay () { return 1 ; } @Override public long period () { return 1 ; } @Override public TimeUnit timeUnit () { return TimeUnit.SECONDS; } public void append (final String json) { if (StringUtil.isNotEmpty(json)) { bufferList.add(json); } } }
持久化测试 1.测试代码 1 2 3 4 5 6 7 ICache<String, String> cache = CacheBs.<String,String>newInstance() .persist(CachePersists.<String, String>aof("1.aof" )) .build(); cache.put("1" , "1" ); cache.expire("1" , 10 ); cache.remove("2" ); TimeUnit.SECONDS.sleep(1 );
2.测试日志 expire 实际上调用的是 expireAt。
1 2 3 4 5 6 7 8 9 10 11 [DEBUG] [2020 -10 -02 12 :20 :41.979 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName" :"put" ,"params" :["1" ,"1" ]} [DEBUG] [2020 -10 -02 12 :20 :41.980 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName" :"put" ,"params" :["1" ,"1" ]} [DEBUG] [2020 -10 -02 12 :20 :41.982 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName" :"expireAt" ,"params" :["1" ,1601612441990 ]} [DEBUG] [2020 -10 -02 12 :20 :41.982 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName" :"expireAt" ,"params" :["1" ,1601612441990 ]} [DEBUG] [2020 -10 -02 12 :20 :41.984 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName" :"remove" ,"params" :["2" ]} [DEBUG] [2020 -10 -02 12 :20 :41.984 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName" :"remove" ,"params" :["2" ]} [DEBUG] [2020 -10 -02 12 :20 :42.088 ] [pool-1 -thread-1 ] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1 , value: 1 , type: expire [INFO] [2020 -10 -02 12 :20 :42.789 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.InnerCachePersist.run] - 开始持久化缓存信息 [INFO] [2020 -10 -02 12 :20 :42.789 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.CachePersistAof.persist] - 开始 AOF 持久化到文件 [INFO] [2020 -10 -02 12 :20 :42.798 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.CachePersistAof.persist] - 完成 AOF 持久化到文件 [INFO] [2020 -10 -02 12 :20 :42.799 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.InnerCachePersist.run] - 完成持久化缓存信息
3.文件内容 1.aof
的文件内容如下
1 2 3 {"methodName" :"put" ,"params" :["1" ,"1" ]} {"methodName" :"expireAt" ,"params" :["1" ,1601612441990 ]} {"methodName" :"remove" ,"params" :["2" ]}
将每一次的操作,简单的存储到文件中
AOF 加载实现 1.加载 类似于 RDB 的加载模式,aof 的加载模式也是类似的。
我们需要根据文件的内容,还原以前的缓存的内容。
实现思路:遍历文件内容,反射调用原来的方法。
2.代码实现 解析文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public void load (ICache<K, V> cache) { List<String> lines = FileUtil.readAllLines(dbPath); log.info("[load] 开始处理 path: {}" , dbPath); if (CollectionUtil.isEmpty(lines)) { log.info("[load] path: {} 文件内容为空,直接返回" , dbPath); return ; } for (String line : lines) { if (StringUtil.isEmpty(line)) { continue ; } PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class); final String methodName = entry.getMethodName(); final Object[] objects = entry.getParams(); final Method method = METHOD_MAP.get(methodName); ReflectMethodUtil.invoke(cache, method, objects); } }
方法映射的预加载
Method 反射是固定的,为了提升性能,我们做一下预处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static final Map<String, Method> METHOD_MAP = new HashMap<>();static { Method[] methods = Cache.class.getMethods(); for (Method method : methods){ CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class); if (cacheInterceptor != null ) { if (cacheInterceptor.aof()) { String methodName = method.getName(); METHOD_MAP.put(methodName, method); } } } }
持久化加载测试 1.文件内容
1 {"methodName" :"put" ,"params" :["1" ,"1" ]}
2.测试 1 2 3 4 5 6 ICache<String, String> cache = CacheBs.<String,String>newInstance() .load(CacheLoads.<String, String>aof("default.aof" )) .build(); Assert.assertEquals(1 , cache.size()); System.out.println(cache.keySet());
直接将 default.aof 文件加载到 cache 缓存中
八.监听器的开发 下面我们将一起学习一下如何实现类似 guava-cache 中的 removeListener 删除监听器,和类似 redis 中的慢日志监控的 slowListener
删除监听器 :将数据驱除或过期时删除的数据记录打印到删除日志中
慢操作监听器 :当操作变慢时,将警告信息或报警信息打印到慢日志中
删除监听器的开发 1.基本介绍 我们在两种场景下删除数据是对用户透明的:
(1)size 满了之后,进行数据淘汰。
(2)expire 过期时,清除数据。
这两个特性对用户本来应该是无感的,不过用户如果关心的话,也可以通过添加删除监听器来获取到相关的变更信息
2.实现思路 为了实现删除的监听,我们需要找到删除的位置,然后调用监听器即可
每次 put 数据时,都会校验 size 是否达到最大的限制,如果达到,则进行 evict 淘汰
用户指定 expire 时间之后,回后台异步执行刷新
也存在惰性删除的场景
3.接口定义 为了统一,我们将所有的删除都定义统一的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface ICacheRemoveListener <K ,V > { void listen (final ICacheRemoveListenerContext<K,V> context) ; }
4.内置实现 系统内置的实现如下:
1 2 3 4 5 6 7 8 9 10 11 public class CacheRemoveListener <K ,V > implements ICacheRemoveListener <K ,V > { private static final Log log = LogFactory.getLog(CacheRemoveListener.class); @Override public void listen (ICacheRemoveListenerContext<K, V> context) { log.debug("Remove key: {}, value: {}, type: {}" , context.key(), context.value(), context.type()); } }
这个监听器是默认开启的,暂时无法关闭
5.自定义 用户可以自己的需要,进行自定义实现:
1 2 3 4 5 6 7 8 public class MyRemoveListener <K ,V > implements ICacheRemoveListener <K ,V > { @Override public void listen (ICacheRemoveListenerContext<K, V> context) { System.out.println("【删除提示】可恶,我竟然被删除了!" + context.key()); } }
6.使用测试 1 2 3 4 5 6 7 ICache<String, String> cache = CacheBs.<String,String>newInstance() .size(1 ) .addRemoveListener(new MyRemoveListener<String, String>()) .build(); cache.put("1" , "1" ); cache.put("2" , "2" );
我们指定 cache 的大小为1,设置我们自定义的删除监听器
这里的删除监听器可以添加多个
7.测试结果 测试日志如下:
1 2 [DEBUG] [2020 -09-30 19 :32 :54.617 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 2 , value: 2 , type: evict 【删除提示】可恶,我竟然被删除了!2
慢操作监听器开发 1.说明 redis 中会存储慢操作的相关日志信息,主要是由两个参数构成:
(1)slowlog-log-slower-than 预设阈值,它的单位是毫秒(1秒=1000000微秒)默认值是10000
(2)slowlog-max-len 最多存储多少条的慢日志记录
不过 redis 是直接存储到内存中,而且有长度限制。
根据实际工作体验,如果我们可以添加慢日志的监听,然后有对应的存储或者报警,这样更加方便问题的分析和快速反馈。
所以我们引入类似于删除的监听器。
2.实现思路 我们处理所有的 cache 操作,并且记录对应的操作耗时。
如果耗时操作用户设置的时间阈值,则调用慢操作监听器。
3.接口定义 为了保证接口的灵活性,每一个实现都可以定义自己的慢操作阈值,这样便于分级处理。
比如超过 100ms,用户可以选择输出 warn 日志;超过 1s,可能影响到业务了,可以直接接入报警系统。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface ICacheSlowListener { void listen (final ICacheSlowListenerContext context) ; long slowerThanMills () ; }
4.自定义监听器 实现接口 ICacheSlowListener
这里每一个监听器都可以指定自己的慢日志阈值,便于分级处理
1 2 3 4 5 6 7 8 9 10 11 12 13 public class MySlowListener implements ICacheSlowListener { @Override public void listen (ICacheSlowListenerContext context) { System.out.println("【慢日志】name: " + context.methodName()); } @Override public long slowerThanMills () { return 0 ; } }
5.使用测试 1 2 3 4 5 6 ICache<String, String> cache = CacheBs.<String,String>newInstance() .addSlowListener(new MySlowListener()) .build(); cache.put("1" , "2" ); cache.get("1" );
6.测试结果 1 2 3 4 5 6 [DEBUG] [2020 -09-30 17 :40 :11.547 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: put [DEBUG] [2020 -09-30 17 :40 :11.551 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: put, cost: 10ms 【慢日志】name: put [DEBUG] [2020 -09-30 17 :40 :11.554 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: get [DEBUG] [2020 -09-30 17 :40 :11.554 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: get, cost: 1ms 【慢日志】name: get
实际工作中,我们可以针对慢日志数据存储,便于后期分析。
也可以直接接入报警系统,及时反馈问题
九.Redis 渐进式 Rehash详解 HashMap 简介 1.HashMap 的 rehash 读过 HashMap 源码的同学,应该都知道 map 在扩容的时候,有一个 rehash 的过程
2.HashMap 的扩容简介 这里简单介绍下:
扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素
当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶
Redis 中的扩容设计 HashMap 的扩容需要对集合中大部分的元素进行重新计算,但是对于 redis 这种企业级应用,特别是单线程的应用,如果像传统的 rehash 一样把所有元素来一遍的话,估计要十几秒的时间。
十几秒对于常见的金融、电商等相对高并发的业务场景,是无法忍受的。
那么 redis 的 rehash 是如何实现的呢?
实际上 redis 的 rehash 动作并不是一次性、集中式地完成的, 而是分多次、渐进式地完成的 。
这里补充一点,不单单是扩容,缩容也是一样的道理,二者都需要进行 rehash。
只增不降就是对内存的浪费,浪费就是犯罪,特别是内存还这么贵。
ps: 这种思想和 key 淘汰有异曲同工之妙,一口吃不了一个大胖子,一次搞不定,那就 1024 次,慢慢来总能解决问题
Redis 的渐进式 rehash 这部分直接选自经典入门书籍《Redis 设计与实现》
1.为什么要渐进式处理? 实际上 redis 内部有两个 hashtable,我们称之为 ht[0] 和 ht[1]。传统的 HashMap 中只有一个。
为了避免 rehash 对服务器性能造成影响, 服务器不是一次性将 ht[0] 里面的所有键值对全部 rehash 到 ht[1] , 而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地 rehash 到 ht[1] 。
2.详细步骤 哈希表渐进式 rehash 的详细步骤:
(1)为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表。
(2)在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始。
(3)在 rehash 进行期间, 每次对字典执行添加、删除、查找或者更新操作时, 程序除了执行指定的操作以外, 还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] , 当 rehash 工作完成之后, 程序将 rehashidx 属性的值增1。
(4)随着字典操作的不断执行, 最终在某个时间点上, ht[0] 的所有键值对都会被 rehash 至 ht[1] , 这时程序将 rehashidx 属性的值设为 -1 , 表示 rehash 操作已完成。
渐进式 rehash 的好处在于它采取分而治之的方式, 将 rehash 键值对所需的计算工作均滩到对字典的每个添加、删除、查找和更新操作上, 从而避免了集中式 rehash 而带来的庞大计算量。
3.rehash 间的操作怎么兼容呢? 因为在进行渐进式 rehash 的过程中, 字典会同时使用 ht[0] 和 ht[1] 两个哈希表, 那这期间的操作如何保证正常进行呢?
(1)查询一个信息
这个类似于我们的数据库信息等迁移,先查询一个库,没有的话,再去查询另一个库。
ht[0] 中没找到,我们去 ht[1] 中查询即可。
(2)新数据怎么办?
这个和数据迁移一样的道理。
当我们有新旧的两个系统时,新来的用户等信息直接落在新系统即可,
这一措施保证了 ht[0] 包含的键值对数量会只减不增, 并随着 rehash 操作的执行而最终变成空表。
扩容 1.什么时候判断? redis 在每次执行 put 操作的时候,就可以检查是否需要扩容。
其实也很好理解,put 插入元素的时候,判断是否需要扩容,然后开始扩容,是直接的一种思路。
留一个思考题:我们可以在其他的时候判断吗?
2.redis 判断是否需要扩容的源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static int _dictExpandIfNeeded (dict *d) { if (dictIsRehashing(d)) return DICT_OK; if (d->ht[0 ].size == 0 ) return dictExpand(d, DICT_HT_INITIAL_SIZE); if (d->ht[0 ].used >= d->ht[0 ].size && (dict_can_resize || d->ht[0 ].used/d->ht[0 ].size > dict_force_resize_ratio)) { return dictExpand(d, d->ht[0 ].used*2 ); } return DICT_OK; }
扩容的条件总结下来就是两句话:
(1)服务器目前没有在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1;
(2)服务器目前正在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5;
这里其实体现了作者的一种设计思想:如果负载因子超过5,说明信息已经很多了,管你在不在保存,都要执行扩容,优先保证服务可用性。如果没那么高,那就等持久化完成再做 rehash。
我们自己在实现的时候可以简化一下,比如只考虑情况2。
3.扩容到原来的多少? 知道了什么时候应该开始扩容,但是要扩容到多大也是值得思考的一个问题。
扩容的太小,会导致频繁扩容,浪费性能。
扩容的太大,会导致资源的浪费。
其实这个最好的方案是结合我们实际的业务,不过这部分对用户是透明的。
一般是扩容为原来的两倍。
4.为什么需要扩容? 我们在实现 ArrayList 的时候需要扩容,因为数据放不下了。
我们知道 HashMap 的底层是数组 + 链表(红黑树)的数据结构。
那么会存在放不下的情况吗?
个人理解实际上不会。因为链表可以一直加下去。
那为什么需要扩容呢?
实际上更多的是处于性能的考虑。我们使用 HashMap 就是为了提升性能,如果一直不扩容,可以理解为元素都 hash 到相同的 bucket 上,这时就退化成了一个链表。
这会导致查询等操作性能大大降低。
缩容 1.什么时候判断? 看了前面的扩容,我们比较直观地方式是在用户 remove 元素的时候执行是否需要缩容。
不过 redis 并不完全等同于传统的 HashMap,还有数据的淘汰和过期,这些是对用户透明的。
redis 采用的方式实际上是一个定时任务。
个人理解内存缩容很重要,但是没有那么紧急,我们可以 1min 扫描一次,这样可以节省机器资源。
实际工作中,一般 redis 的内存都是逐步上升的,或者稳定在一个范围内,很少去大批量删除数据。(除非数据搞错了,我就遇到过一次,数据同步错地方了)。
所以数据删除,一般几分钟内给用户一个反馈就行。
知其然,知其所以然。
我们懂得了这个道理也就懂得了为什么有时候删除 redis 的几百万 keys,内存也不是直接降下来的原因。
2.缩容的条件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void tryResizeHashTables (int dbid) { if (htNeedsResize(server.db[dbid].dict)) dictResize(server.db[dbid].dict); if (htNeedsResize(server.db[dbid].expires)) dictResize(server.db[dbid].expires); } #define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */ int htNeedsResize (dict *dict) { long long size, used; size = dictSlots(dict); used = dictSize(dict); return (size > DICT_HT_INITIAL_SIZE && (used*100 /size < HASHTABLE_MIN_FILL)); } int dictResize (dict *d) { int minimal; if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR; minimal = d->ht[0 ].used; if (minimal < DICT_HT_INITIAL_SIZE) minimal = DICT_HT_INITIAL_SIZE; return dictExpand(d, minimal); }
和扩容类似,不过这里的缩容比例不是 5 倍,而是当哈希表保存的key数量与哈希表的大小的比例小于 10% 时需要缩容。
3.缩容到多少? 最简单的方式是直接变为原来的一半,不过这么做有时候也不是那么好用。
redis 是缩容后的大小为第一个大于等于当前key数量的2的n次方。
这个可能不太好理解,举几个数字就懂了:
keys数量
缩容大小
3
4
4
4
5
8
9
16
主要保障以下3点:
(1)缩容之后,要大于等于 key 的数量
(2)尽可能的小,节约内存
(3)2 的倍数。
第三个看过 HashMap 源码讲解的小伙伴应该深有体会。
当然也不能太小,redis 限制的最小为 4。
实际上如果 redis 中只放 4 个 key,实在是杀鸡用牛刀,一般不会这么小。
我们在实现的时候,直接参考 jdk 好了,给个最小值限制 8。
4.为什么需要缩容? 最核心的目的就是为了节约内存,其实还有一个原因,叫 small means fast(小即是快——老马)。
渐进式 ReHash 实现的思考 好了,扩容和缩容就聊到这里,那么这个渐进式 rehash 到底怎么一个渐进法?
1.扩容前 不需要扩容时应该有至少需要初始化两个元素:
1 2 3 4 hashtable[0 ] = new HashTable(size); hashIndex=-1 ; hashtable[1 ] = null ;
hashtable 中存储着当前的元素信息,hashIndex=-1 标识当前没有在进行扩容。
2.扩容准备 当需要扩容的时候,我们再去创建一个 hashtable[1],并且 size 是原来的 2倍。
1 2 3 4 5 hashtable[0 ] = new HashTable(size); hashtable[1 ] = new HashTable(2 * size); hashIndex=-1 ;
主要是为了节约内存,使用惰性初始化的方式创建 hashtable。
3.扩容时 调整 hashIndex=0…size,逐步去 rehash 到新的 hashtable[1]
新的插入全部放入到 hashtable[1]
4.扩容后 扩容后我们应该把 hashtable[0] 的值更新为 hashtable[1],并且释放掉 hashtable[1] 的资源。
并且设置 hashIndex=-1,标识已经 rehash 完成
1 2 3 4 hashtable[0 ] = hashtable[1 ]; hashIndex=-1 ; hashtable[1 ] = null ;
这样整体的实现思路就已经差不多了,光说不练假把式,我们下一节就来自己实现一个渐进式 rehash 的 HashMap
十.实现渐进式ReHash Map类定义 类定义 1 2 3 4 5 6 7 8 public class MyProgressiveReHashMap <K ,V > extends AbstractMap <K ,V > implements Map <K ,V > {}
和简易版本类似。
私有变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private static final Log log = LogFactory.getLog(MyProgressiveReHashMap.class);private int rehashIndex = -1 ;private int capacity;private int rehashCapacity;private int size = 0 ;private final double factor = 1.0 ;private List<List<Entry<K, V>>> table;private List<List<Entry<K, V>>> rehashTable;private boolean debugMode = false ;
rehashIndex/rehashCapacity/rehashTable 这三个值都是我们在进行渐进式实现的时候需要使用的值。
构造器 主要是一些值的初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public MyProgressiveReHashMap () { this (8 ); } public MyProgressiveReHashMap (int capacity) { this (capacity, false ); } public MyProgressiveReHashMap (int capacity, boolean debugMode) { this .capacity = capacity; this .table = new ArrayList<>(capacity); for (int i = 0 ; i < capacity; i++) { this .table.add(i, new ArrayList<Entry<K, V>>()); } this .debugMode = debugMode; this .rehashIndex = -1 ; this .rehashCapacity = -1 ; this .rehashTable = null ; }
put() 方法 这个方法相对难度比较大:
put() 的过程可以见方法的注释。
需要考虑是否为 rehash 阶段,还需要考虑是否为更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @Override public V put (K key, V value) { boolean isInRehash = isInReHash(); if (!isInRehash) { Pair<Boolean, V> pair = updateTableInfo(key, value, this .table, this .capacity); if (pair.getValueOne()) { V oldVal = pair.getValueTwo(); if (debugMode) { log.debug("不处于渐进式 rehash,此次为更新操作。key: {}, value: {}" , key, value); printTable(this .table); } return oldVal; } else { return this .createNewEntry(key, value); } } else { if (debugMode) { log.debug("当前处于渐进式 rehash 阶段,额外执行一次渐进式 rehash 的动作" ); } rehashToNew(); Pair<Boolean, V> pair = updateTableInfo(key, value, this .table, this .capacity); if (pair.getValueOne()) { V oldVal = pair.getValueTwo(); if (debugMode) { log.debug("此次为更新 table 操作。key: {}, value: {}" , key, value); printTable(this .table); } return oldVal; } Pair<Boolean, V> pair2 = updateTableInfo(key, value, this .rehashTable, this .rehashCapacity); if (pair2.getValueOne()) { V oldVal = pair2.getValueTwo(); if (debugMode) { log.debug("此次为更新 rehashTable 操作。key: {}, value: {}" , key, value); printTable(this .table); } return oldVal; } return this .createNewEntry(key, value); } }
1.是否为 rehash 阶段 这个实现比较简单,就是判断 rehashIndex 是否为 -1:
1 2 3 4 5 6 7 private boolean isInReHash () { return rehashIndex != -1 ; }
2.更新列表信息 这里为了复用,对方法进行了抽象。可以同时使用到 table 和 rehashTable 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private Pair<Boolean, V> updateTableInfo (K key, V value, final List<List<Entry<K,V>>> table, final int tableCapacity) { int hash = HashUtil.hash(key); int index = HashUtil.indexFor(hash, tableCapacity); List<Entry<K,V>> entryList = new ArrayList<>(); if (index < table.size()) { entryList = table.get(index); } for (Entry<K,V> entry : entryList) { final K entryKey = entry.getKey(); if (ObjectUtil.isNull(key, entryKey) || key.equals(entryKey)) { final V oldValue = entry.getValue(); entry.setValue(value); return Pair.of(true , oldValue); } } return Pair.of(false , null ); }
这个和以前基本是类似的。
返回结果时,为了同时保存是否为更新,以及更新的 value 值。所以使用了 Pair 工具类。
3.插入新的元素 插入方法也比较麻烦,需要区分是否处于渐进式 rehash 阶段。还要考虑是否需要扩容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private V createNewEntry (final K key, final V value) { Entry<K,V> entry = new DefaultMapEntry<>(key, value); int hash = HashUtil.hash(key); if (isInReHash()) { int index = HashUtil.indexFor(hash, this .rehashCapacity); List<Entry<K,V>> list = this .rehashTable.get(index); list.add(entry); if (debugMode) { log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。" ); printTable(this .rehashTable); } } if (isNeedExpand()) { rehash(); int index = HashUtil.indexFor(hash, this .rehashCapacity); List<Entry<K,V>> list = this .rehashTable.get(index); list.add(entry); if (debugMode) { log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。" ); printTable(this .rehashTable); } } else { int index = HashUtil.indexFor(hash, this .capacity); List<Entry<K,V>> list = this .table.get(index); list.add(entry); if (debugMode) { log.debug("目前不处于 rehash 中,元素直接插入到 table 中。" ); printTable(this .table); } } this .size++; return value; }
是否需要扩容的方法也比较简单:
1 2 3 4 5 6 7 8 9 10 11 private boolean isNeedExpand () { double rate = size*1.0 / capacity*1.0 ; return rate >= factor && !isInReHash(); }
不过我们这次添加了一个不要处于渐进式 rehash 过程中。
其中 rehash 的实现也发生了很大的变化,具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void rehash () { if (isInReHash()) { if (debugMode) { log.debug("当前处于渐进式 rehash 阶段,不重复进行 rehash!" ); } return ; } this .rehashIndex = -1 ; this .rehashCapacity = 2 *capacity; this .rehashTable = new ArrayList<>(this .rehashCapacity); for (int i = 0 ; i < rehashCapacity; i++) { rehashTable.add(i, new ArrayList<Entry<K, V>>()); } rehashToNew(); }
渐进式更新的方法,可以在 get/put/remove 等操作时,执行附加操作时使用。
所以单独抽成了一个方法,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private void rehashToNew () { rehashIndex++; List<Entry<K, V>> list = table.get(rehashIndex); for (Entry<K, V> entry : list) { int hash = HashUtil.hash(entry); int index = HashUtil.indexFor(hash, rehashCapacity); List<Entry<K,V>> newList = rehashTable.get(index); newList.add(entry); rehashTable.set(index, newList); } table.set(rehashIndex, new ArrayList<Entry<K, V>>()); if (rehashIndex == (table.size()-1 )) { this .capacity = this .rehashCapacity; this .table = this .rehashTable; this .rehashIndex = -1 ; this .rehashCapacity = -1 ; this .rehashTable = null ; if (debugMode) { log.debug("渐进式 rehash 已经完成。" ); printTable(this .table); }p } else { if (debugMode) { log.debug("渐进式 rehash 处理中, 目前 index:{} 已完成" , rehashIndex); printAllTable(); } } }
get() 操作 渐进式 rehash 将动作分散到每一个操作中,我们对 get 方法进行重写,当做一个例子。其他的方法如果实现也是类似的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public V get (Object key) { if (isInReHash()) { if (debugMode) { log.debug("当前处于渐进式 rehash 状态,额外执行一次操作" ); rehashToNew(); } } V result = getValue(key, this .table); if (result != null ) { return result; } if (isInReHash()) { return getValue(key, this .rehashTable); } return null ; }
测试 我们历经千辛万苦,终于实现了一个简单版本的渐进式 hash map。
下面来测试一下功能是否符合我们的预期。
1.put 操作 1 2 3 Map<String, String> map = new MyProgressiveReHashMap<>(2 , true ); map.put("1" , "1" ); map.put("1" , "2" );
日志:
1 2 3 4 [DEBUG] [2020 -10 -11 21 :30 :15.072 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。 {1 : 1 } [DEBUG] [2020 -10 -11 21 :30 :15.076 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.put] - 不处于渐进式 rehash,此次为更新操作。key: 1 , value: 2 {1 : 2 }
第一次是插入,第二次是更新。
这里都没有触发扩容,下面我们看一下触发扩容的情况。
2.扩容测试 1 2 3 4 5 6 7 8 Map<String, String> map = new MyProgressiveReHashMap<>(2 , true ); map.put("1" , "1" ); map.put("2" , "2" ); map.put("3" , "3" ); Assert.assertEquals("1" , map.get("1" )); Assert.assertEquals("2" , map.get("2" )); Assert.assertEquals("3" , map.get("3" ));
日志如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 [DEBUG] [2020 -10 -11 21 :31 :12.559 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。 {1 : 1 } [DEBUG] [2020 -10 -11 21 :31 :12.560 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。 {2 : 2 } {1 : 1 } [DEBUG] [2020 -10 -11 21 :31 :12.563 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 处理中, 目前 index:0 已完成 原始 table 信息: {1 : 1 } 新的 table 信息: {2 : 2 } [DEBUG] [2020 -10 -11 21 :31 :12.563 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前处于 rehash 中,元素直接插入到 rehashTable 中。 {2 : 2 } {3 : 3 } [DEBUG] [2020 -10 -11 21 :31 :12.564 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.get] - 当前处于渐进式 rehash 状态,额外执行一次操作 [DEBUG] [2020 -10 -11 21 :31 :12.564 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 已经完成。 {2 : 2 } {1 : 1 } {3 : 3 }
当放入元素【3】的时候,已经触发了 rehash。
(1)第一次渐进式 rehash 将 table[0] 的元素 rehash 到了新的节点。
(2)插入的元素直接插入到 rehashTable 中
(3)get 操作时,额外触发一次 rehash,然后所有的 rehash 已经完成。
参考资料 Fluent Interface 流式接口