一.固定大小缓存实现 缓存接口定义 为了兼容 Map,我们定义缓存接口继承自 Map 接口。
1 2 3 4 5 public  interface  ICache <K , V > extends  Map <K , V >  {} 
 
核心实现 我们主要看一下 put 时的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public  V put (K key, V value)   {         CacheEvictContext<K,V> context = new  CacheEvictContext<>();     context.key(key).size(sizeLimit).cache(this );          cacheEvict.evict(context);          if (isSizeLimit()) {         throw  new  CacheRuntimeException("当前队列已满,数据添加失败!" );     }          return  map.put(key, value); } 
 
这里我们可以让用户动态指定大小,但是指定大小肯就要有对应的淘汰策略。
否则,固定大小的 map 肯定无法放入元素。
淘汰策略 淘汰策略可以有多种,比如 LRU/LFU/FIFO 等等,我们此处实现一个最基本的 FIFO。
所有实现以接口的方式实现,便于后期灵活替换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  class  CacheEvictFIFO <K ,V > implements  ICacheEvict <K ,V >  {         private  Queue<K> queue = new  LinkedList<>();     @Override      public  void  evict (ICacheEvictContext<K, V> context)   {         final  ICache<K,V> cache = context.cache();                  if (cache.size() >= context.size()) {             K evictKey = queue.remove();                          cache.remove(evictKey);         }                  final  K key = context.key();         queue.add(key);     } } 
 
FIFO 比较简单,我们使用一个队列,存储每一次放入的元素,当队列超过最大限制时,删除最早的元素。
引导类 为了便于用户使用,我们实现类似于 guava 的引导类。
所有参数都提供默认值,使用 fluent 流式写法,提升用户体验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public  final  class  CacheBs <K ,V >  {    private  CacheBs ()  {}          public  static  <K,V> CacheBs<K,V> newInstance ()   {         return  new  CacheBs<>();     }          private  Map<K,V> map = new  HashMap<>();          private  int  size = Integer.MAX_VALUE;          private  ICacheEvict<K,V> evict = CacheEvicts.fifo();          public  CacheBs<K, V> map (Map<K, V> map)   {         ArgUtil.notNull(map, "map" );         this .map = map;         return  this ;     }          public  CacheBs<K, V> size (int  size)   {         ArgUtil.notNegative(size, "size" );         this .size = size;         return  this ;     }          public  CacheBs<K, V> evict (ICacheEvict<K, V> evict)   {         this .evict = evict;         return  this ;     }          public  ICache<K,V> build ()   {         CacheContext<K,V> context = new  CacheContext<>();         context.cacheEvict(evict);         context.map(map);         context.size(size);         return  new  Cache<>(context);     } } 
 
测试使用 1 2 3 4 5 6 7 8 9 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(2 )         .build(); cache.put("1" , "1" ); cache.put("2" , "2" ); cache.put("3" , "3" ); cache.put("4" , "4" ); Assert.assertEquals(2 , cache.size()); System.out.println(cache.keySet()); 
 
默认为先进先出的策略,此时输出 keys,内容如下:
 
二.LRU缓存淘汰策略 上面默认使用FIFO淘汰策略即先进先淘汰,下我们来开发LRU缓存淘汰策略即淘汰最近最少使用
LRU基本原理 1.LRU 是什么 LRU 是由 Least Recently Used 的首字母组成,表示最近最少使用的含义,一般使用在对象淘汰算法上。
也是比较常见的一种淘汰算法。
其核心思想是如果数据最近被访问过,那么将来被访问的几率也更高 。
2.连续性 在计算机科学中,有一个指导准则:连续性准则。
时间连续性:对于信息的访问,最近被访问过,被再次访问的可能性会很高。缓存就是基于这个理念进行数据淘汰的。
空间连续性:对于磁盘信息的访问,将很有可能访问连续的空间信息。所以会有 page 预取来提升性能。
3.实现步骤 
新数据插入到链表头部; 
每当缓存命中(即缓存数据被访问),则将数据移到链表头部; 
当链表满的时候,将链表尾部的数据丢弃。 
 
其实比较简单,比起 FIFO 的队列,我们引入一个链表实现即可。
5.LRU实现步骤疑点 我们针对上面的 3 句话,逐句考虑一下,看看有没有值得优化点或者一些坑。
如何判断是新数据? 
新数据插入到链表头部; 
我们使用的是链表。
判断新数据最简单的方法就是遍历是否存在,对于链表,这是一个 O(n) 的时间复杂度。
其实性能还是比较差的。
当然也可以考虑空间换时间,比如引入一个 set 之类的,不过这样对空间的压力会加倍。
什么是缓存命中 
每当缓存命中(即缓存数据被访问),则将数据移到链表头部; 
put(key,value) 的情况,就是新元素。如果已有这个元素,可以先删除,再加入,参考上面的处理。
get(key) 的情况,对于元素访问,删除已有的元素,将新元素放在头部。
remove(key) 移除一个元素,直接删除已有元素。
keySet() valueSet() entrySet() 这些属于无差别访问,我们不对队列做调整。
移除 
当链表满的时候,将链表尾部的数据丢弃。 
链表满只有一种场景,那就是添加元素的时候,也就是执行 put(key, value) 的时候。
直接删除对应的 key 即可。
Java 代码实现 1.接口定义 和 FIFO 的接口保持一致,调用地方也不变
为了后续 LRU/LFU 实现,新增 remove/update 两个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  interface  ICacheEvict <K , V >  {         boolean  evict (final  ICacheEvictContext<K, V> context)  ;          void  update (final  K key)  ;          void  remove (final  K key)  ; } 
 
2.LRU 实现 直接基于 LinkedList 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public  class  CacheEvictLRU <K ,V > implements  ICacheEvict <K ,V >  {    private  static  final  Log log = LogFactory.getLog(CacheEvictLRU.class);          private  final  List<K> list = new  LinkedList<>();     @Override      public  boolean  evict (ICacheEvictContext<K, V> context)   {         boolean  result = false ;         final  ICache<K,V> cache = context.cache();                  if (cache.size() >= context.size()) {             K evictKey = list.get(list.size()-1 );                          cache.remove(evictKey);             result = true ;         }         return  result;     }          @Override      public  void  update (final  K key)   {         this .list.remove(key);         this .list.add(0 , key);     }          @Override      public  void  remove (final  K key)   {         this .list.remove(key);     } } 
 
实现比较简单,相对 FIFO 多了三个方法:
update():我们做一点简化,认为只要是访问,就是删除,然后插入到队首。
remove():删除就是直接删除。
这三个方法是用来更新最近使用情况的。
那什么时候调用呢?
3.注解属性 为了保证核心流程,我们基于注解实现。
添加属性:
1 2 3 4 5 6 7 boolean  evict ()  default  false  ;
 
4.注解使用 有哪些方法需要使用?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override @CacheInterceptor(refresh = true, evict = true) public  boolean  containsKey (Object key)   {    return  map.containsKey(key); } @Override @CacheInterceptor(evict = true) @SuppressWarnings("unchecked") public  V get (Object key)   {         K genericKey = (K) key;     this .expire.refreshExpire(Collections.singletonList(genericKey));     return  map.get(key); } @Override @CacheInterceptor(aof = true, evict = true) public  V put (K key, V value)   {     } @Override @CacheInterceptor(aof = true, evict = true) public  V remove (Object key)   {    return  map.remove(key); } 
 
5.注解驱除拦截器实现 执行顺序:放在方法之后更新,不然每次当前操作的 key 都会被放在最前面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  class  CacheInterceptorEvict <K ,V > implements  ICacheInterceptor <K , V >  {    private  static  final  Log log = LogFactory.getLog(CacheInterceptorEvict.class);     @Override      public  void  before (ICacheInterceptorContext<K,V> context)   {     }     @Override      @SuppressWarnings("all")      public  void  after (ICacheInterceptorContext<K,V> context)   {         ICacheEvict<K,V> evict = context.cache().evict();         Method method = context.method();         final  K key = (K) context.params()[0 ];         if ("remove" .equals(method.getName())) {             evict.remove(key);         } else  {             evict.update(key);         }     } } 
 
我们只对 remove 方法做下特判,其他方法都使用 update 更新信息。
参数直接取第一个参数。
测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(3 )         .evict(CacheEvicts.<String, String>lru())         .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet()); 
 
 
通过 removeListener 日志也可以看到 B 被移除了:
1 [DEBUG] [2020 -10 -02  21 :33 :44.578 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 
 
三.LRU缓存策略的优化 数据结构选择 1.基于数组 方案:为每一个数据附加一个额外的属性——时间戳,当每一次访问数据时,更新该数据的时间戳至当前时间。
当数据空间已满后,则扫描整个数组,淘汰时间戳最小的数据。
不足:维护时间戳需要耗费额外的空间,淘汰数据时需要扫描整个数组。
这个时间复杂度太差,空间复杂度也不好。
2.基于长度有限的双向链表 方案:访问一个数据时,当数据不在链表中,则将数据插入至链表头部,如果在链表中,则将该数据移至链表头部。当数据空间已满后,则淘汰链表最末尾的数据。
不足:插入数据或取数据时,需要扫描整个链表。
这个就是我们上一节实现的方式,缺点还是很明显,每次确认元素是否存在,都要消耗 O(n) 的时间复杂度去查询。
3.基于双向链表和哈希表 方案:为了改进上面需要扫描链表的缺陷,配合哈希表,将数据和链表中的节点形成映射,将插入操作和读取操作的时间复杂度从O(N)降至O(1)
缺点:这个使我们上一节提到的优化思路,不过还是有缺点的,那就是空间复杂度翻倍。
4.数据结构的选择总结 (1)基于数组的实现
这里不建议选择 array 或者 ArrayList,因为读取的时间复杂度为 O(1),但是更新相对是比较慢的,虽然 jdk 使用的是 System.arrayCopy。
(2)基于双向链表的实现
如果我们选择链表,HashMap 中还是不能简单的存储 key, 和对应的下标。
因为链表的遍历,实际上还是 O(n) 的,双向链表理论上可以优化一半,但是这并不是我们想要的 O(1) 效果。
(3)基于双向链表 + Map实现
双向链表我们保持不变。
Map 中 key 对应的值我们放双向链表的节点信息。
那实现方式就变成了实现一个双向链表。
基于自定义双向链表实现 1.节点定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public  class  DoubleListNode <K ,V >  {         private  K key;          private  V value;          private  DoubleListNode<K,V> pre;          private  DoubleListNode<K,V> next;      } 
 
2.核心代码实现 我们保持和原来的接口不变,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 public  class  CacheEvictLruDoubleListMap <K ,V > extends  AbstractCacheEvict <K ,V >  {    private  static  final  Log log = LogFactory.getLog(CacheEvictLruDoubleListMap.class);          private  DoubleListNode<K,V> head;          private  DoubleListNode<K,V> tail;          private  Map<K, DoubleListNode<K,V>> indexMap;     public  CacheEvictLruDoubleListMap ()   {         this .indexMap = new  HashMap<>();         this .head = new  DoubleListNode<>();         this .tail = new  DoubleListNode<>();         this .head.next(this .tail);         this .tail.pre(this .head);     }     @Override      protected  ICacheEntry<K, V> doEvict (ICacheEvictContext<K, V> context)   {         ICacheEntry<K, V> result = null ;         final  ICache<K,V> cache = context.cache();                  if (cache.size() >= context.size()) {                          DoubleListNode<K,V> tailPre = this .tail.pre();             if (tailPre == this .head) {                 log.error("当前列表为空,无法进行删除" );                 throw  new  CacheRuntimeException("不可删除头结点!" );             }             K evictKey = tailPre.key();             V evictValue = cache.remove(evictKey);             result = new  CacheEntry<>(evictKey, evictValue);         }         return  result;     }          @Override      public  void  update (final  K key)   {                  this .remove(key);                                    DoubleListNode<K,V> newNode = new  DoubleListNode<>();         newNode.key(key);         DoubleListNode<K,V> next = this .head.next();         this .head.next(newNode);         newNode.pre(this .head);         next.pre(newNode);         newNode.next(next);                  indexMap.put(key, newNode);     }          @Override      public  void  remove (final  K key)   {         DoubleListNode<K,V> node = indexMap.get(key);         if (ObjectUtil.isNull(node)) {             return ;         }                                    DoubleListNode<K,V> pre = node.pre();         DoubleListNode<K,V> next = node.next();         pre.next(next);         next.pre(pre);                  this .indexMap.remove(key);     } } 
 
实现起来不难,就是一个简易版本的双向列表。
只是获取节点的时候,借助了一下 map,让时间复杂度降低为 O(1)。
3.测试结果 我们验证一下自己的实现:
测试代码 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(3 )         .evict(CacheEvicts.<String, String>lruDoubleListMap())         .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet()); 
 
日志 
1 2 [DEBUG] [2020 -10 -03  09:37 :41.007 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [D, A, C] 
 
因为我们访问过一次 A,所以 B 已经变成最少被访问的元素。
基于 LinkedHashMap 实现 实际上,LinkedHashMap 本身就是对于 list 和 hashMap 的一种结合的数据结构,我们可以直接使用 jdk 中 LinkedHashMap 去实现。
1.直接实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  class  LRUCache  extends  LinkedHashMap   {    private  int  capacity;     public  LRUCache (int  capacity)   {                  super (16 , 0.75f , true );         this .capacity = capacity;     }     @Override      protected  boolean  removeEldestEntry (Map.Entry eldest)   {         return  super .size() >= capacity;     } } 
 
默认LinkedHashMap并不会淘汰数据,所以我们重写了它的removeEldestEntry()方法,当数据数量达到预设上限后,淘汰数据,accessOrder设为true意为按照访问的顺序排序。
整个实现的代码量并不大,主要都是应用LinkedHashMap的特性。
2.简单改造 我们对这个方法简单改造下,让其适应我们定义的接口。
3.测试结果 测试代码 
1 2 3 4 5 6 7 8 9 10 11 12 13 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(3 )         .evict(CacheEvicts.<String, String>lruLinkedHashMap())         .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet()); 
 
日志 
1 2 [DEBUG] [2020 -10 -03  10 :20 :57.842 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [D, A, C] 
 
LRU扩展算法 当存在热点数据时,LRU的效率很好,但偶发性的、周期性的批量操作会导致LRU命中率急剧下降,缓存污染情况比较严重。
1. LRU-K LRU-K中的K代表最近使用的次数,因此LRU可以认为是LRU-1。
LRU-K的主要目的是为了解决LRU算法“缓存污染”的问题,其核心思想是将“最近使用过1次”的判断标准扩展为“最近使用过K次”。
相比LRU,LRU-K需要多维护一个队列,用于记录所有缓存数据被访问的历史。只有当数据的访问次数达到K次的时候,才将数据放入缓存。
当需要淘汰数据时,LRU-K会淘汰第K次访问时间距当前时间最大的数据。
数据第一次被访问时,加入到历史访问列表,如果数据在访问历史列表中没有达到K次访问,则按照一定的规则(FIFO,LRU)淘汰;
当访问历史队列中的数据访问次数达到K次后,将数据索引从历史队列中删除,将数据移到缓存队列中,并缓存数据,缓存队列重新按照时间排序;
缓存数据队列中被再次访问后,重新排序,需要淘汰数据时,淘汰缓存队列中排在末尾的数据,即“淘汰倒数K次访问离现在最久的数据”。
LRU-K具有LRU的优点,同时还能避免LRU的缺点,实际应用中LRU-2是综合最优的选择。
由于LRU-K还需要记录那些被访问过、但还没有放入缓存的对象,因此内存消耗会比LRU要多。
2. two queue Two queues(以下使用2Q代替)算法类似于LRU-2,不同点在于2Q将LRU-2算法中的访问历史队列(注意这不是缓存数据的)改为一个FIFO缓存队列,即:2Q算法有两个缓存队列,一个是FIFO队列,一个是LRU队列。
当数据第一次访问时,2Q算法将数据缓存在FIFO队列里面,当数据第二次被访问时,则将数据从FIFO队列移到LRU队列里面,两个队列各自按照自己的方法淘汰数据。
新访问的数据插入到FIFO队列中,如果数据在FIFO队列中一直没有被再次访问,则最终按照FIFO规则淘汰;
如果数据在FIFO队列中再次被访问到,则将数据移到LRU队列头部,如果数据在LRU队列中再次被访问,则将数据移动LRU队列头部,LRU队列淘汰末尾的数据。
3. Multi Queue(MQ) MQ算法根据访问频率将数据划分为多个队列,不同的队列具有不同的访问优先级,其核心思想是:优先缓存访问次数多的数据 。
详细的算法结构图如下,Q0,Q1….Qk代表不同的优先级队列,Q-history代表从缓存中淘汰数据,但记录了数据的索引和引用次数的队列:
新插入的数据放入Q0,每个队列按照LRU进行管理,当数据的访问次数达到一定次数,需要提升优先级时,将数据从当前队列中删除,加入到高一级队列的头部;为了防止高优先级数据永远不会被淘汰,当数据在指定的时间里没有被访问时,需要降低优先级,将数据从当前队列删除,加入到低一级的队列头部;需要淘汰数据时,从最低一级队列开始按照LRU淘汰,每个队列淘汰数据时,将数据从缓存中删除,将数据索引加入Q-history头部。
如果数据在Q-history中被重新访问,则重新计算其优先级,移到目标队列头部。
Q-history按照LRU淘汰数据的索引。
MQ需要维护多个队列,且需要维护每个数据的访问时间,复杂度比LRU高。
4.LRU算法对比 
对比点 
对比 
 
 
命中率 
LRU-2 > MQ(2) > 2Q > LRU 
 
复杂度 
LRU-2 > MQ(2) > 2Q > LRU 
 
代价 
LRU-2  > MQ(2) > 2Q > LRU 
 
实际上上面的几个算法,思想上大同小异。
核心目的:解决批量操作导致热点数据失效,缓存被污染的问题。
实现方式:增加一个队列,用来保存只访问一次的数据,然后根据次数不同,放入到 LRU 中。
只访问一次的队列,可以是 FIFO 队列,可以是 LRU,我们来实现一下 2Q 和 LRU-2 两种实现。
2Q算法实现 1.实现思路 实际上就是我们以前的 FIFO + LRU 二者的结合。
2.基本属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public  class  CacheEvictLru2Q <K ,V > extends  AbstractCacheEvict <K ,V >  {    private  static  final  Log log = LogFactory.getLog(CacheEvictLru2Q.class);          private  static  final  int  LIMIT_QUEUE_SIZE = 1024 ;          private  Queue<K> firstQueue;          private  DoubleListNode<K,V> head;          private  DoubleListNode<K,V> tail;          private  Map<K, DoubleListNode<K,V>> lruIndexMap;     public  CacheEvictLru2Q ()   {         this .firstQueue = new  LinkedList<>();         this .lruIndexMap = new  HashMap<>();         this .head = new  DoubleListNode<>();         this .tail = new  DoubleListNode<>();         this .head.next(this .tail);         this .tail.pre(this .head);     } } 
 
3.数据淘汰 数据淘汰的逻辑:
当缓存大小,已经达到最大限制时执行:
(1)优先淘汰 firstQueue 中的数据
(2)如果 firstQueue 中数据为空,则淘汰 lruMap 中的数据信息。
这里有一个假设:我们认为被多次访问的数据,重要性高于被只访问了一次的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override protected  ICacheEntry<K, V> doEvict (ICacheEvictContext<K, V> context)   {    ICacheEntry<K, V> result = null ;     final  ICache<K,V> cache = context.cache();          if (cache.size() >= context.size()) {         K evictKey = null ;                  if (!firstQueue.isEmpty()) {             evictKey = firstQueue.remove();         } else  {                          DoubleListNode<K,V> tailPre = this .tail.pre();             if (tailPre == this .head) {                 log.error("当前列表为空,无法进行删除" );                 throw  new  CacheRuntimeException("不可删除头结点!" );             }             evictKey = tailPre.key();         }                  V evictValue = cache.remove(evictKey);         result = new  CacheEntry<>(evictKey, evictValue);     }     return  result; } 
 
4.数据删除 当数据被删除时调用:
这个逻辑和以前类似,只是多了一个 FIFO 队列的移除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public  void  removeKey (final  K key)   {    DoubleListNode<K,V> node = lruIndexMap.get(key);          if (ObjectUtil.isNotNull(node)) {                           DoubleListNode<K,V> pre = node.pre();         DoubleListNode<K,V> next = node.next();         pre.next(next);         next.pre(pre);                  this .lruIndexMap.remove(node.key());     } else  {                  firstQueue.remove(key);     } } 
 
5.数据的更新 当数据被访问时,提升数据的优先级。
(1)如果在 lruMap 中,则首先移除,然后放入到头部
(2)如果不在 lruMap 中,但是在 FIFO 队列,则从 FIFO 队列中移除,添加到 LRU map 中。
(3)如果都不在,直接加入到 FIFO 队列中即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public  void  updateKey (final  K key)   {              DoubleListNode<K,V> node = lruIndexMap.get(key);     if (ObjectUtil.isNotNull(node)         || firstQueue.contains(key)) {                  this .removeKey(key);                  this .addToLruMapHead(key);         return ;     }               firstQueue.add(key); } 
 
这里我想到了一个优化点,限制 firstQueue 的一直增长,因为遍历的时间复杂度为 O(n),所以限制最大的大小为 1024。
如果超过了,则把 FIFO 中的元素先移除掉。
不过只移除 FIFO,不移除 cache,会导致二者的活跃程度不一致;
如果同时移除,但是 cache 的大小还没有满足,可能会导致超出用户的预期,这个可以作为一个优化点,暂时注释掉。
6.测试 代码 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(3 )         .evict(CacheEvicts.<String, String>lru2Q())         .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet()); 
 
效果 
1 2 [DEBUG] [2020 -10 -03  13 :15 :50.670 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [D, A, C] 
 
LRU-2算法实现 1.实现LRU简介 FIFO 中的缺点还是比较明显的,需要 O(n) 的时间复杂度做遍历。
而且命中率和 LRU-2 比起来还是会差一点。
这里 LRU map 出现了多次,我们为了方便,将 LRU map 简单的封装为一个数据结构。
我们使用双向链表+HashMap 实现一个简单版本的。
2.节点 node 节点和以前一致:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public  class  DoubleListNode <K ,V >  {         private  K key;          private  V value;          private  DoubleListNode<K,V> pre;          private  DoubleListNode<K,V> next;      } 
 
3.接口 我们根据自己的需要,暂时定义 3 个最重要的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public  interface  ILruMap <K ,V >  {         ICacheEntry<K, V> removeEldest ()  ;          void  updateKey (final  K key)  ;          void  removeKey (final  K key)  ;          boolean  isEmpty ()  ;          boolean  contains (final  K key)  ; } 
 
4.实现 我们基于 DoubleLinkedList + HashMap 实现。
就是把上一节中的实现整理一下即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 public  class  LruMapDoubleList <K ,V > implements  ILruMap <K ,V >  {    private  static  final  Log log = LogFactory.getLog(LruMapDoubleList.class);          private  DoubleListNode<K,V> head;          private  DoubleListNode<K,V> tail;          private  Map<K, DoubleListNode<K,V>> indexMap;     public  LruMapDoubleList ()   {         this .indexMap = new  HashMap<>();         this .head = new  DoubleListNode<>();         this .tail = new  DoubleListNode<>();         this .head.next(this .tail);         this .tail.pre(this .head);     }     @Override      public  ICacheEntry<K, V> removeEldest ()   {                  DoubleListNode<K,V> tailPre = this .tail.pre();         if (tailPre == this .head) {             log.error("当前列表为空,无法进行删除" );             throw  new  CacheRuntimeException("不可删除头结点!" );         }         K evictKey = tailPre.key();         V evictValue = tailPre.value();         return  CacheEntry.of(evictKey, evictValue);     }          @Override      public  void  updateKey (final  K key)   {                  this .removeKey(key);                                    DoubleListNode<K,V> newNode = new  DoubleListNode<>();         newNode.key(key);         DoubleListNode<K,V> next = this .head.next();         this .head.next(newNode);         newNode.pre(this .head);         next.pre(newNode);         newNode.next(next);                  indexMap.put(key, newNode);     }          @Override      public  void  removeKey (final  K key)   {         DoubleListNode<K,V> node = indexMap.get(key);         if (ObjectUtil.isNull(node)) {             return ;         }                                    DoubleListNode<K,V> pre = node.pre();         DoubleListNode<K,V> next = node.next();         pre.next(next);         next.pre(pre);                  this .indexMap.remove(key);     }     @Override      public  boolean  isEmpty ()   {         return  indexMap.isEmpty();     }     @Override      public  boolean  contains (K key)   {         return  indexMap.containsKey(key);     } } 
 
5.基本属性 LRU 的实现保持不变。我们直接将 FIFO 替换为 LRU map 即可。
为了便于理解,我们将 FIFO 对应为 firstLruMap,用来存放用户只访问了一次的元素。
将原来的 LRU 中存入访问了 2 次及其以上的元素。
其他逻辑和 2Q 保持一致。
定义两个 LRU,用来分别存储访问的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  class  CacheEvictLru2 <K ,V > extends  AbstractCacheEvict <K ,V >  {    private  static  final  Log log = LogFactory.getLog(CacheEvictLru2.class);          private  final  ILruMap<K,V> firstLruMap;          private  final  ILruMap<K,V> moreLruMap;     public  CacheEvictLru2 ()   {         this .firstLruMap = new  LruMapDoubleList<>();         this .moreLruMap = new  LruMapDoubleList<>();     } } 
 
6.淘汰实现 和 lru 2Q 模式类似,这里我们优先淘汰 firstLruMap 中的数据信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override protected  ICacheEntry<K, V> doEvict (ICacheEvictContext<K, V> context)   {    ICacheEntry<K, V> result = null ;     final  ICache<K,V> cache = context.cache();          if (cache.size() >= context.size()) {         ICacheEntry<K,V>  evictEntry = null ;                  if (!firstLruMap.isEmpty()) {             evictEntry = firstLruMap.removeEldest();             log.debug("从 firstLruMap 中淘汰数据:{}" , evictEntry);         } else  {                          evictEntry = moreLruMap.removeEldest();             log.debug("从 moreLruMap 中淘汰数据:{}" , evictEntry);         }                  final  K evictKey = evictEntry.key();         V evictValue = cache.remove(evictKey);         result = new  CacheEntry<>(evictKey, evictValue);     }     return  result; } 
 
7.删除 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public  void  removeKey (final  K key)   {         if (moreLruMap.contains(key)) {         moreLruMap.removeKey(key);         log.debug("key: {} 从 moreLruMap 中移除" , key);     } else  {         firstLruMap.removeKey(key);         log.debug("key: {} 从 firstLruMap 中移除" , key);     } } 
 
8.更新 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public  void  updateKey (final  K key)   {         if (moreLruMap.contains(key)         || firstLruMap.contains(key)) {                  this .removeKey(key);                  moreLruMap.updateKey(key);         log.debug("key: {} 多次访问,加入到 moreLruMap 中" , key);     } else  {                  firstLruMap.updateKey(key);         log.debug("key: {} 为第一次访问,加入到 firstLruMap 中" , key);     } } 
 
实际上使用 LRU-2 的代码逻辑反而变得清晰了一些,主要是因为我们把 lruMap 作为独立的数据结构抽离了出去
9.测试 代码 
1 2 3 4 5 6 7 8 9 10 11 12 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(3 )         .evict(CacheEvicts.<String, String>lru2Q())         .build(); cache.put("A" , "hello" ); cache.put("B" , "world" ); cache.put("C" , "FIFO" ); cache.get("A" ); cache.put("D" , "LRU" ); Assert.assertEquals(3 , cache.size()); System.out.println(cache.keySet()); 
 
日志 
为了便于定位分析,源代码实现的时候,加了一点日志。
1 2 3 4 5 6 7 8 9 [DEBUG] [2020 -10 -03  14 :39 :04.966 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 为第一次访问,加入到 firstLruMap 中 [DEBUG] [2020 -10 -03  14 :39 :04.967 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: B 为第一次访问,加入到 firstLruMap 中 [DEBUG] [2020 -10 -03  14 :39 :04.968 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: C 为第一次访问,加入到 firstLruMap 中 [DEBUG] [2020 -10 -03  14 :39 :04.970 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.removeKey] - key: A 从 firstLruMap 中移除 [DEBUG] [2020 -10 -03  14 :39 :04.970 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 多次访问,加入到 moreLruMap 中 [DEBUG] [2020 -10 -03  14 :39 :04.972 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.doEvict] - 从 firstLruMap 中淘汰数据:EvictEntry{key=B, value=null } [DEBUG] [2020 -10 -03  14 :39 :04.974 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict [DEBUG] [2020 -10 -03  14 :39 :04.974 ] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: D 为第一次访问,加入到 firstLruMap 中 [D, A, C] 
 
10.小结 对于 LRU 算法的改进我们主要做了两点:
(1)性能的改进,从 O(N) 优化到 O(1)
(2)批量操作的改进,避免缓存污染
四.过期功能的实现 缓存接口定义 我们首先来定义一下接口。
主要有两个:一个是多久之后过期,一个是在什么时候过期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  interface  ICache <K , V > extends  Map <K , V >  {         ICache<K, V> expire (final  K key, final  long  timeInMills)  ;          ICache<K, V> expireAt (final  K key, final  long  timeInMills)  ; } 
 
缓存接口实现 为了便于处理,我们将多久之后过期,进行计算。将两个问题变成同一个问题,在什么时候过期的问题。
核心的代码,主要还是看 cacheExpire 接口。
1 2 3 4 5 6 7 8 9 10 11 @Override public  ICache<K, V> expire (K key, long  timeInMills)   {    long  expireTime = System.currentTimeMillis() + timeInMills;     return  this .expireAt(key, expireTime); } @Override public  ICache<K, V> expireAt (K key, long  timeInMills)   {    this .cacheExpire.expire(key, timeInMills);     return  this ; } 
 
缓存过期接口 这里为了便于后期拓展,对于过期的处理定义为接口,便于后期灵活替换
其中 expire(final K key, final long expireAt); 就是我们方法中调用的地方。
refershExpire 属于惰性删除,需要进行刷新时才考虑,我们后面讲解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public  interface  ICacheExpire <K ,V >  {         void  expire (final  K key, final  long  expireAt)  ;          void  refreshExpire (final  Collection<K> keyList)  ; } 
 
expire 实现原理 其实过期的实思路也比较简单:我们可以开启一个定时任务,比如 1 秒钟做一次轮训,将过期的信息清空。
1.过期信息的存储 1 2 3 4 5 6 7 8 9 10 11 private  final  Map<K, Long> expireMap = new  HashMap<>();@Override public  void  expire (K key, long  expireAt)   {    expireMap.put(key, expireAt); } 
 
我们定义一个 map,key 是对应的要过期的信息,value 存储的是过期时间。
2.轮询清理 我们固定 100ms 清理一次,每次最多清理 100 个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private  static  final  int  LIMIT = 100 ;private  final  ICache<K,V> cache;private  static  final  ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();public  CacheExpire (ICache<K, V> cache)   {    this .cache = cache;     this .init(); } private  void  init ()   {    EXECUTOR_SERVICE.scheduleAtFixedRate(new  ExpireThread(), 100 , 100 , TimeUnit.MILLISECONDS); } 
 
这里定义了一个单线程,用于执行清空任务。
3.清空任务 这个非常简单,遍历过期数据,判断对应的时间,如果已经到期了,则执行清空操作。
为了避免单次执行时间过长,最多只处理 100 条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private  class  ExpireThread  implements  Runnable   {    @Override      public  void  run ()   {                  if (MapUtil.isEmpty(expireMap)) {             return ;         }                  int  count = 0 ;         for (Map.Entry<K, Long> entry : expireMap.entrySet()) {             if (count >= LIMIT) {                 return ;             }             expireKey(entry);             count++;         }     } } private  void  expireKey (Map.Entry<K, Long> entry)   {    final  K key = entry.getKey();     final  Long expireAt = entry.getValue();          long  currentTime = System.currentTimeMillis();     if (currentTime >= expireAt) {         expireMap.remove(key);                  cache.remove(key);     } } 
 
惰性删除 1.出现的原因 类似于 redis,我们采用定时删除的方案,就有一个问题:可能数据清理的不及时。
那当我们查询时,可能获取到到是脏数据。
于是就有一些人就想了,当我们关心某些数据时,才对数据做对应的删除判断操作,这样压力会小很多。
算是一种折中方案。
2.需要惰性删除的方法 一般就是各种查询方法,比如我们获取 key 对应的值时
1 2 3 4 5 6 7 8 @Override @SuppressWarnings("unchecked") public  V get (Object key)   {         K genericKey = (K) key;     this .cacheExpire.refreshExpire(Collections.singletonList(genericKey));     return  map.get(key); } 
 
我们在获取之前,先做一次数据的刷新。
3.刷新的实现 实现原理也非常简单,就是一个循环,然后作删除即可。
这里加了一个小的优化:选择数量少的作为外循环。
循环集合的时间复杂度是 O(n), map.get() 的时间复杂度是 O(1);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public  void  refreshExpire (Collection<K> keyList)   {    if (CollectionUtil.isEmpty(keyList)) {         return ;     }          if (keyList.size() <= expireMap.size()) {         for (K key : keyList) {             expireKey(key);         }     } else  {         for (Map.Entry<K, Long> entry : expireMap.entrySet()) {             this .expireKey(entry);         }     } } 
 
测试 上面的代码写完之后,我们就可以验证一下了。
1 2 3 4 5 6 7 8 9 10 11 12 13 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(3 )         .build(); cache.put("1" , "1" ); cache.put("2" , "2" ); cache.expire("1" , 10 ); Assert.assertEquals(2 , cache.size()); TimeUnit.MILLISECONDS.sleep(50 ); Assert.assertEquals(1 , cache.size()); System.out.println(cache.keySet()); 
 
结果也符合我们的预期
五.过期功能的优化 上面的过期功能的实现中存在两个问题:
(1)keys 的选择不够随机,可能会导致每次循环 100 个结束时,真正需要过期的没有被遍历到
(2)keys 的遍历可能大部分都是无效的
所以下面以过期时间为维度对过期功能进行优化
基于时间的遍历 1.思路 我们换一种思路,让过期的时间做 key,相同时间的需要过期的信息放在一个列表中,作为 value
然后对过期时间排序,轮询的时候就可以快速判断出是否有过期的信息了。
我们每次 put 放入过期元素时,根据过期时间对元素进行排序,相同的过期时间的 Keys 放在一起。
优点:定时遍历的时候,如果时间不到当前时间,就可以直接返回了,大大降低无效遍历。
缺点:考虑到惰性删除问题,还是需要存储以删除信息作为 key 的 map 关系,这样内存基本翻倍。
2.基本属性定义 我们这里使用 TreeMap 帮助我们进行过期时间的排序,这个集合后续有时间可以详细讲解了,我大概看了下 jdk1.8 的源码,主要是通过红黑树实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public  class  CacheExpireSort <K ,V > implements  ICacheExpire <K ,V >  {         private  static  final  int  LIMIT = 100 ;          private  final  Map<Long, List<K>> sortMap = new  TreeMap<>(new  Comparator<Long>() {         @Override          public  int  compare (Long o1, Long o2)   {             return  (int ) (o1 - o2);         }     });          private  final  Map<K, Long> expireMap = new  HashMap<>();          private  final  ICache<K,V> cache; } 
 
3.放入元素时 每次存入新元素时,同时放入 sortMap 和 expireMap。
1 2 3 4 5 6 7 8 9 10 11 @Override public  void  expire (K key, long  expireAt)   {    List<K> keys = sortMap.get(expireAt);     if (keys == null ) {         keys = new  ArrayList<>();     }     keys.add(key);          sortMap.put(expireAt, keys);     expireMap.put(key, expireAt); } 
 
定时任务的执行 1.定义 我们定义一个定时任务,100ms 执行一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private  static  final  ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();public  CacheExpireSort (ICache<K, V> cache)   {    this .cache = cache;     this .init(); } private  void  init ()   {    EXECUTOR_SERVICE.scheduleAtFixedRate(new  ExpireThread(), 100 , 100 , TimeUnit.MILLISECONDS); } 
 
2.执行任务 实现源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private  class  ExpireThread  implements  Runnable   {    @Override      public  void  run ()   {                  if (MapUtil.isEmpty(sortMap)) {             return ;         }                  int  count = 0 ;         for (Map.Entry<Long, List<K>> entry : sortMap.entrySet()) {             final  Long expireAt = entry.getKey();             List<K> expireKeys = entry.getValue();                          if (CollectionUtil.isEmpty(expireKeys)) {                 sortMap.remove(expireAt);                 continue ;             }             if (count >= LIMIT) {                 return ;             }                          long  currentTime = System.currentTimeMillis();             if (currentTime >= expireAt) {                 Iterator<K> iterator = expireKeys.iterator();                 while  (iterator.hasNext()) {                     K key = iterator.next();                                          iterator.remove();                     expireMap.remove(key);                                          cache.remove(key);                     count++;                 }             } else  {                                  return ;             }         }     } } 
 
这里直接遍历 sortMap,对应的 key 就是过期时间,然后和当前时间对比即可。
删除的时候,需要删除 expireMap/sortMap/cache。
惰性删除刷新 惰性删除刷新时,就会用到 expireMap。
因为有时候刷新的 key 就一个,如果没有 expireMap 映射关系,可能要把 sortMap 全部遍历一遍才能找到对应的过期时间。
就是一个时间复杂度与空间复杂度衡量的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public  void  refreshExpire (Collection<K> keyList)   {    if (CollectionUtil.isEmpty(keyList)) {         return ;     }                    final  int  expireSize = expireMap.size();     if (expireSize <= keyList.size()) {                  for (Map.Entry<K,Long> entry : expireMap.entrySet()) {             K key = entry.getKey();                                       this .removeExpireKey(key);         }     } else  {         for (K key : keyList) {             this .removeExpireKey(key);         }     } } private  void  removeExpireKey (final  K key)   {    Long expireTime = expireMap.get(key);     if (expireTime != null ) {         final  long  currentTime = System.currentTimeMillis();         if (currentTime >= expireTime) {             expireMap.remove(key);             List<K> expireKeys = sortMap.get(expireTime);             expireKeys.remove(key);             sortMap.put(expireTime, expireKeys);         }     } } 
 
六.RBD持久化功能的实现 缓存的持久化功能分为以下两个部分:
Cache 的内容持久化到文件或者数据库 
初始化的时候加载持久化数据 
 
持久化操作 1.持久化操作接口 为了便于灵活替换,我们定义一个持久化的接口。
1 2 3 4 5 6 7 8 9 public  interface  ICachePersist <K , V >  {         void  persist (final  ICache<K, V> cache)  ; } 
 
2.持久化操作接口实现 我们实现一个最简单的基于 json 的持久化,当然后期可以添加类似于 AOF 的持久化模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public  class  CachePersistDbJson <K ,V > implements  ICachePersist <K ,V >  {         private  final  String dbPath;     public  CachePersistDbJson (String dbPath)   {         this .dbPath = dbPath;     }          @Override      public  void  persist (ICache<K, V> cache)   {         Set<Map.Entry<K,V>> entrySet = cache.entrySet();                  FileUtil.createFile(dbPath);                  FileUtil.truncate(dbPath);         for (Map.Entry<K,V> entry : entrySet) {             K key = entry.getKey();             Long expireTime = cache.expire().expireTime(key);                          PersistEntry<K,V> persistEntry = new  PersistEntry<>();             persistEntry.setKey(key);             persistEntry.setValue(entry.getValue());             persistEntry.setExpire(expireTime);                                       String line = JSON.toJSONString(persistEntry);                          FileUtil.write(dbPath, line, StandardOpenOption.APPEND);         }     } } 
 
3.持久化操作的触发 上面定义好了一种持久化的策略,但是没有提供对应的触发方式。
我们就采用对用户透明的设计方式:定时执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public  class  InnerCachePersist <K ,V >  {    private  static  final  Log log = LogFactory.getLog(InnerCachePersist.class);          private  final  ICache<K,V> cache;          private  final  ICachePersist<K,V> persist;          private  static  final  ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();     public  InnerCachePersist (ICache<K, V> cache, ICachePersist<K, V> persist)   {         this .cache = cache;         this .persist = persist;                  this .init();     }          private  void  init ()   {         EXECUTOR_SERVICE.scheduleAtFixedRate(new  Runnable() {             @Override              public  void  run ()   {                 try  {                     log.info("开始持久化缓存信息" );                     persist.persist(cache);                     log.info("完成持久化缓存信息" );                 } catch  (Exception exception) {                     log.error("文件持久化异常" , exception);                 }             }         }, 0 , 10 , TimeUnit.MINUTES);     } } 
 
定时执行的时间间隔为 10min。
4.测试 我们只需要在创建 Cache 时,指定我们的持久化策略即可。
1 2 3 4 5 6 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .load(new  MyCacheLoad())         .persist(CachePersists.<String, String>dbJson("1.rdb" ))         .build(); Assert.assertEquals(2 , cache.size()); TimeUnit.SECONDS.sleep(5 ); 
 
为了确保文件持久化完成,我们沉睡了一会儿。
测试结果如下
生成1.rdb文件内容如下:
1 2 {"key" :"2" ,"value" :"2" } {"key" :"1" ,"value" :"1" } 
 
缓存数据的加载 1.缓存初始化接口 缓存初始化即从持久化中提取数据到缓存内存中
为了便于后期拓展,定义 ICacheLoad 接口。
1 2 3 4 5 6 7 8 9 public  interface  ICacheLoad <K , V >  {         void  load (final  ICache<K,V> cache)  ; } 
 
2.缓存初始化接口实现 我们只需要实现以下对应的加载即可,解析文件,然后初始化 cache。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public  class  CacheLoadDbJson <K ,V > implements  ICacheLoad <K ,V >  {    private  static  final  Log log = LogFactory.getLog(CacheLoadDbJson.class);          private  final  String dbPath;     public  CacheLoadDbJson (String dbPath)   {         this .dbPath = dbPath;     }     @Override      public  void  load (ICache<K, V> cache)   {         List<String> lines = FileUtil.readAllLines(dbPath);         log.info("[load] 开始处理 path: {}" , dbPath);         if (CollectionUtil.isEmpty(lines)) {             log.info("[load] path: {} 文件内容为空,直接返回" , dbPath);             return ;         }         for (String line : lines) {             if (StringUtil.isEmpty(line)) {                 continue ;             }                                       PersistEntry<K,V> entry = JSON.parseObject(line, PersistEntry.class);             K key = entry.getKey();             V value = entry.getValue();             Long expire = entry.getExpire();             cache.put(key, value);             if (ObjectUtil.isNotNull(expire)) {                 cache.expireAt(key, expire);             }         }     } } 
 
然后在初始化时使用即可。
七.AOF持久化功能的实现 Redis AOF 解析 1.为什么选择 AOF? AOF 模式的性能特别好 :
用过 kafka 的同学肯定知道,kafka 也用到了顺序写这个特性。
顺序写添加文件内容,避免了文件 IO 的随机写问题,性能基本可以和内存媲美。
AOF 的实时性更好 ,这个是相对于 RDB 模式而言的:
我们原来使用 RDB 模式,将缓存内容全部持久化,这个是比较耗时的动作,一般是几分钟持久化一次。
AOF 模式主要是针对修改内容的指令,然后将所有的指令顺序添加到文件中。这样的话,实时性会好很多,可以提升到秒级别,甚至秒级别。可以将AOF模式理解为一个操作流水表
2.AOF 的吞吐量 AOF 模式可以每次操作都进行持久化,但是这样会导致吞吐量大大下降。
提升吞吐量最常用的方式就是批量 ,这个 kafka 中也是类似的,比如我们可以 1s 持久化一次,将 1s 内的操作全部放入 buffer 中。
这里其实就是一个 trade-off 问题,实时性与吞吐量的平衡艺术。
实际业务中,1s 的误差一般都是可以接受的,所以这个也是业界比较认可的方式。
3.AOF 的异步+多线程 kafka 中所有的操作实际上都是异步+回调的方式实现的。
异步+多线程,确实可以提升操作的性能。
当然 redis 6 以前,其实一直是单线程的。那为什么性能依然这么好呢?
其实多线程也有代价,那就是线程上下文的切换是需要耗时的,保持并发的安全问题,也需要加锁,从而降低性能。
所以这里要考虑异步的收益,与付出的耗时是否成正比的问题。
4.AOF 的落盘 我们 AOF 与 RDB 模式,归根结底都是基于操作系统的文件系统做持久化的。
对于开发者而言,可能就是调用一个 api 就实现了,但是实际持久化落盘的动作并不见得就是一步完成的。
文件系统为了提升吞吐量,也会采用类似 buffer 的方式。这忽然有一点俄罗斯套娃的味道。
但是优秀的设计总是相似的,比如说缓存从 cpu 的设计中就有 L1/L2 等等,思路是一致的。
阿里的很多开源技术,都会针对操作系统的落盘做进一步的优化,这个我们后续做深入学习
5.AOF 的缺陷 大道缺一,没有银弹。
AOF 千好万好,和 RDB 对比也存在一个缺陷,那就是指令
AOF注解实现 1.接口 接口和 rdb 的保持一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  interface  ICachePersist <K , V >  {         void  persist (final  ICache<K, V> cache)  ; } 
 
2.注解定义 为了和耗时统计,刷新等特性保持一致,对于操作类的动作才添加到文件中(append to file)我们也基于注解属性来指定,而不是固定写死在代码中,便于后期拓展调整
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Documented @Inherited @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public  @interface  CacheInterceptor {         boolean  aof ()  default  false  ; } 
 
我们在原来的 @CacheInterceptor 注解中添加 aof 属性,用于指定是否对操作开启 aof 模式
3.过期操作中启用AOF 类似于 spring 的事务拦截器,我们使用代理类调用 expireAt。
expire 方法就不需要添加 aof 拦截了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override @CacheInterceptor public  ICache<K, V> expire (K key, long  timeInMills)   {    long  expireTime = System.currentTimeMillis() + timeInMills;          Cache<K,V> cachePoxy = (Cache<K, V>) CacheProxy.getProxy(this );     return  cachePoxy.expireAt(key, expireTime); } @Override @CacheInterceptor(aof = true) public  ICache<K, V> expireAt (K key, long  timeInMills)   {    this .expire.expire(key, timeInMills);     return  this ; } 
 
4.变更操作中启用AOF 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override @CacheInterceptor(aof = true) public  V put (K key, V value)   {         CacheEvictContext<K,V> context = new  CacheEvictContext<>();     context.key(key).size(sizeLimit).cache(this );     boolean  evictResult = evict.evict(context);     if (evictResult) {                  ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code());         for (ICacheRemoveListener<K,V> listener : this .removeListeners) {             listener.listen(removeListenerContext);         }     }          if (isSizeLimit()) {         throw  new  CacheRuntimeException("当前队列已满,数据添加失败!" );     }          return  map.put(key, value); } @Override @CacheInterceptor(aof = true) public  V remove (Object key)   {    return  map.remove(key); } @Override @CacheInterceptor(aof = true) public  void  putAll (Map<? extends K, ? extends V> m)   {    map.putAll(m); } @Override @CacheInterceptor(refresh = true, aof = true) public  void  clear ()   {    map.clear(); } 
 
AOF 持久化拦截实现 1.持久化对象定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  class  PersistAofEntry   {         private  Object[] params;          private  String methodName;(      } 
 
这里我们只需要方法名,和参数对象。
暂时实现的简单一些即可
2.持久化拦截器 我们定义拦截器,当 cache 中定义的持久化类为 CachePersistAof 时,将操作的信息放入到 CachePersistAof 的 buffer 列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public  class  CacheInterceptorAof <K ,V > implements  ICacheInterceptor <K , V >  {    private  static  final  Log log = LogFactory.getLog(CacheInterceptorAof.class);     @Override      public  void  before (ICacheInterceptorContext<K,V> context)   {     }     @Override      public  void  after (ICacheInterceptorContext<K,V> context)   {                  ICache<K,V> cache = context.cache();         ICachePersist<K,V> persist = cache.persist();         if (persist instanceof  CachePersistAof) {             CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist;             String methodName = context.method().getName();             PersistAofEntry aofEntry = PersistAofEntry.newInstance();             aofEntry.setMethodName(methodName);             aofEntry.setParams(context.params());             String json = JSON.toJSONString(aofEntry);                          log.debug("AOF 开始追加文件内容:{}" , json);             cachePersistAof.append(json);             log.debug("AOF 完成追加文件内容:{}" , json);         }     } } 
 
3.拦截器调用 当 AOF 的注解属性为 true 时,调用上述拦截器即可。
这里为了避免浪费,只有当持久化类为 AOF 模式时,才进行调用。
1 2 3 4 5 6 7 8 9 final  ICachePersist cachePersist = cache.persist();if (cacheInterceptor.aof() && (cachePersist instanceof  CachePersistAof)) {    if (before) {         persistInterceptors.before(interceptorContext);     } else  {         persistInterceptors.after(interceptorContext);     } } 
 
AOF持久化实现 这里的 AOF 模式和以前的 RDB 持久化类只是不同的模式,实际上二者是相同的接口。
1.接口 这里我们统一定义了不同的持久化类的时间,便于 RDB 与 AOF 不同任务的不同时间间隔触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  interface  ICachePersist <K , V >  {         void  persist (final  ICache<K, V> cache)  ;          long  delay ()  ;          long  period ()  ;          TimeUnit timeUnit ()  ; } 
 
2.持久化类实现 实现一个 Buffer 列表,用于每次拦截器直接顺序添加
持久化的实现也比较简单,追加到文件之后,直接清空 buffer 列表即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public  class  CachePersistAof <K ,V > extends  CachePersistAdaptor <K ,V >  {    private  static  final  Log log = LogFactory.getLog(CachePersistAof.class);          private  final  List<String> bufferList = new  ArrayList<>();          private  final  String dbPath;     public  CachePersistAof (String dbPath)   {         this .dbPath = dbPath;     }          @Override      public  void  persist (ICache<K, V> cache)   {         log.info("开始 AOF 持久化到文件" );                  if (!FileUtil.exists(dbPath)) {             FileUtil.createFile(dbPath);         }                  FileUtil.append(dbPath, bufferList);                  bufferList.clear();         log.info("完成 AOF 持久化到文件" );     }     @Override      public  long  delay ()   {         return  1 ;     }     @Override      public  long  period ()   {         return  1 ;     }     @Override      public  TimeUnit timeUnit ()   {         return  TimeUnit.SECONDS;     }          public  void  append (final  String json)   {         if (StringUtil.isNotEmpty(json)) {             bufferList.add(json);         }     } } 
 
持久化测试 1.测试代码 1 2 3 4 5 6 7 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .persist(CachePersists.<String, String>aof("1.aof" ))         .build(); cache.put("1" , "1" ); cache.expire("1" , 10 ); cache.remove("2" ); TimeUnit.SECONDS.sleep(1 ); 
 
2.测试日志 expire 实际上调用的是 expireAt。
1 2 3 4 5 6 7 8 9 10 11 [DEBUG] [2020 -10 -02  12 :20 :41.979 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName" :"put" ,"params" :["1" ,"1" ]} [DEBUG] [2020 -10 -02  12 :20 :41.980 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName" :"put" ,"params" :["1" ,"1" ]} [DEBUG] [2020 -10 -02  12 :20 :41.982 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName" :"expireAt" ,"params" :["1" ,1601612441990 ]} [DEBUG] [2020 -10 -02  12 :20 :41.982 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName" :"expireAt" ,"params" :["1" ,1601612441990 ]} [DEBUG] [2020 -10 -02  12 :20 :41.984 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName" :"remove" ,"params" :["2" ]} [DEBUG] [2020 -10 -02  12 :20 :41.984 ] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName" :"remove" ,"params" :["2" ]} [DEBUG] [2020 -10 -02  12 :20 :42.088 ] [pool-1 -thread-1 ] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1 , value: 1 , type: expire [INFO] [2020 -10 -02  12 :20 :42.789 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.InnerCachePersist.run] - 开始持久化缓存信息 [INFO] [2020 -10 -02  12 :20 :42.789 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.CachePersistAof.persist] - 开始 AOF 持久化到文件 [INFO] [2020 -10 -02  12 :20 :42.798 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.CachePersistAof.persist] - 完成 AOF 持久化到文件 [INFO] [2020 -10 -02  12 :20 :42.799 ] [pool-2 -thread-1 ] [c.g.h.c.c.s.p.InnerCachePersist.run] - 完成持久化缓存信息 
 
3.文件内容 1.aof 的文件内容如下
1 2 3 {"methodName" :"put" ,"params" :["1" ,"1" ]} {"methodName" :"expireAt" ,"params" :["1" ,1601612441990 ]} {"methodName" :"remove" ,"params" :["2" ]} 
 
将每一次的操作,简单的存储到文件中
AOF 加载实现 1.加载 类似于 RDB 的加载模式,aof 的加载模式也是类似的。
我们需要根据文件的内容,还原以前的缓存的内容。
实现思路:遍历文件内容,反射调用原来的方法。
2.代码实现 解析文件 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public  void  load (ICache<K, V> cache)   {    List<String> lines = FileUtil.readAllLines(dbPath);     log.info("[load] 开始处理 path: {}" , dbPath);     if (CollectionUtil.isEmpty(lines)) {         log.info("[load] path: {} 文件内容为空,直接返回" , dbPath);         return ;     }     for (String line : lines) {         if (StringUtil.isEmpty(line)) {             continue ;         }                           PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class);         final  String methodName = entry.getMethodName();         final  Object[] objects = entry.getParams();         final  Method method = METHOD_MAP.get(methodName);                  ReflectMethodUtil.invoke(cache, method, objects);     } } 
 
方法映射的预加载 
Method 反射是固定的,为了提升性能,我们做一下预处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private  static  final  Map<String, Method> METHOD_MAP = new  HashMap<>();static  {    Method[] methods = Cache.class.getMethods();     for (Method method : methods){         CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class);         if (cacheInterceptor != null ) {                          if (cacheInterceptor.aof()) {                 String methodName = method.getName();                 METHOD_MAP.put(methodName, method);             }         }     } } 
 
持久化加载测试 1.文件内容 
1 {"methodName" :"put" ,"params" :["1" ,"1" ]} 
 
2.测试 1 2 3 4 5 6 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .load(CacheLoads.<String, String>aof("default.aof" ))         .build(); Assert.assertEquals(1 , cache.size()); System.out.println(cache.keySet()); 
 
直接将 default.aof 文件加载到 cache 缓存中
八.监听器的开发 下面我们将一起学习一下如何实现类似 guava-cache 中的 removeListener 删除监听器,和类似 redis 中的慢日志监控的 slowListener
删除监听器 :将数据驱除或过期时删除的数据记录打印到删除日志中
慢操作监听器 :当操作变慢时,将警告信息或报警信息打印到慢日志中
删除监听器的开发 1.基本介绍 我们在两种场景下删除数据是对用户透明的:
(1)size 满了之后,进行数据淘汰。
(2)expire 过期时,清除数据。
这两个特性对用户本来应该是无感的,不过用户如果关心的话,也可以通过添加删除监听器来获取到相关的变更信息
2.实现思路 为了实现删除的监听,我们需要找到删除的位置,然后调用监听器即可
    每次 put 数据时,都会校验 size 是否达到最大的限制,如果达到,则进行 evict 淘汰
    用户指定 expire 时间之后,回后台异步执行刷新
    也存在惰性删除的场景
3.接口定义 为了统一,我们将所有的删除都定义统一的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  interface  ICacheRemoveListener <K ,V >  {         void  listen (final  ICacheRemoveListenerContext<K,V> context)  ; } 
 
4.内置实现 系统内置的实现如下:
1 2 3 4 5 6 7 8 9 10 11 public  class  CacheRemoveListener <K ,V > implements  ICacheRemoveListener <K ,V >  {    private  static  final  Log log = LogFactory.getLog(CacheRemoveListener.class);     @Override      public  void  listen (ICacheRemoveListenerContext<K, V> context)   {         log.debug("Remove key: {}, value: {}, type: {}" ,                 context.key(), context.value(), context.type());     } } 
 
这个监听器是默认开启的,暂时无法关闭
5.自定义 用户可以自己的需要,进行自定义实现:
1 2 3 4 5 6 7 8 public  class  MyRemoveListener <K ,V > implements  ICacheRemoveListener <K ,V >  {    @Override      public  void  listen (ICacheRemoveListenerContext<K, V> context)   {         System.out.println("【删除提示】可恶,我竟然被删除了!"  + context.key());     } } 
 
6.使用测试 1 2 3 4 5 6 7 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .size(1 )         .addRemoveListener(new  MyRemoveListener<String, String>())         .build(); cache.put("1" , "1" ); cache.put("2" , "2" ); 
 
我们指定 cache 的大小为1,设置我们自定义的删除监听器
这里的删除监听器可以添加多个
7.测试结果 测试日志如下:
1 2 [DEBUG] [2020 -09-30  19 :32 :54.617 ] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 2 , value: 2 , type: evict 【删除提示】可恶,我竟然被删除了!2  
 
慢操作监听器开发 1.说明 redis 中会存储慢操作的相关日志信息,主要是由两个参数构成:
(1)slowlog-log-slower-than 预设阈值,它的单位是毫秒(1秒=1000000微秒)默认值是10000
(2)slowlog-max-len 最多存储多少条的慢日志记录
不过 redis 是直接存储到内存中,而且有长度限制。
根据实际工作体验,如果我们可以添加慢日志的监听,然后有对应的存储或者报警,这样更加方便问题的分析和快速反馈。
所以我们引入类似于删除的监听器。
2.实现思路 我们处理所有的 cache 操作,并且记录对应的操作耗时。
如果耗时操作用户设置的时间阈值,则调用慢操作监听器。
3.接口定义 为了保证接口的灵活性,每一个实现都可以定义自己的慢操作阈值,这样便于分级处理。
比如超过 100ms,用户可以选择输出 warn 日志;超过 1s,可能影响到业务了,可以直接接入报警系统。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  interface  ICacheSlowListener   {         void  listen (final  ICacheSlowListenerContext context)  ;          long  slowerThanMills ()  ; } 
 
4.自定义监听器 实现接口 ICacheSlowListener
这里每一个监听器都可以指定自己的慢日志阈值,便于分级处理
1 2 3 4 5 6 7 8 9 10 11 12 13 public  class  MySlowListener  implements  ICacheSlowListener   {    @Override      public  void  listen (ICacheSlowListenerContext context)   {         System.out.println("【慢日志】name: "  + context.methodName());     }     @Override      public  long  slowerThanMills ()   {         return  0 ;     } } 
 
5.使用测试 1 2 3 4 5 6 ICache<String, String> cache = CacheBs.<String,String>newInstance()         .addSlowListener(new  MySlowListener())         .build(); cache.put("1" , "2" ); cache.get("1" ); 
 
6.测试结果 1 2 3 4 5 6 [DEBUG] [2020 -09-30  17 :40 :11.547 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: put [DEBUG] [2020 -09-30  17 :40 :11.551 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: put, cost: 10ms 【慢日志】name: put [DEBUG] [2020 -09-30  17 :40 :11.554 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: get [DEBUG] [2020 -09-30  17 :40 :11.554 ] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: get, cost: 1ms 【慢日志】name: get 
 
实际工作中,我们可以针对慢日志数据存储,便于后期分析。
也可以直接接入报警系统,及时反馈问题
九.Redis 渐进式 Rehash详解 HashMap 简介 1.HashMap 的 rehash 读过 HashMap 源码的同学,应该都知道 map 在扩容的时候,有一个 rehash 的过程
2.HashMap 的扩容简介 这里简单介绍下:
扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素
当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶
Redis 中的扩容设计 HashMap 的扩容需要对集合中大部分的元素进行重新计算,但是对于 redis 这种企业级应用,特别是单线程的应用,如果像传统的 rehash 一样把所有元素来一遍的话,估计要十几秒的时间。
十几秒对于常见的金融、电商等相对高并发的业务场景,是无法忍受的。
那么 redis 的 rehash 是如何实现的呢?
实际上 redis 的 rehash 动作并不是一次性、集中式地完成的, 而是分多次、渐进式地完成的 。
这里补充一点,不单单是扩容,缩容也是一样的道理,二者都需要进行 rehash。
只增不降就是对内存的浪费,浪费就是犯罪,特别是内存还这么贵。
ps: 这种思想和 key 淘汰有异曲同工之妙,一口吃不了一个大胖子,一次搞不定,那就 1024 次,慢慢来总能解决问题
Redis 的渐进式 rehash 这部分直接选自经典入门书籍《Redis 设计与实现》
1.为什么要渐进式处理? 实际上 redis 内部有两个 hashtable,我们称之为 ht[0] 和 ht[1]。传统的 HashMap 中只有一个。
为了避免 rehash 对服务器性能造成影响, 服务器不是一次性将 ht[0] 里面的所有键值对全部 rehash 到 ht[1] , 而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地 rehash 到 ht[1] 。
2.详细步骤 哈希表渐进式 rehash 的详细步骤:
(1)为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表。
(2)在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始。
(3)在 rehash 进行期间, 每次对字典执行添加、删除、查找或者更新操作时, 程序除了执行指定的操作以外, 还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] , 当 rehash 工作完成之后, 程序将 rehashidx 属性的值增1。
(4)随着字典操作的不断执行, 最终在某个时间点上, ht[0] 的所有键值对都会被 rehash 至 ht[1] , 这时程序将 rehashidx 属性的值设为 -1 , 表示 rehash 操作已完成。
渐进式 rehash 的好处在于它采取分而治之的方式, 将 rehash 键值对所需的计算工作均滩到对字典的每个添加、删除、查找和更新操作上, 从而避免了集中式 rehash 而带来的庞大计算量。
3.rehash 间的操作怎么兼容呢? 因为在进行渐进式 rehash 的过程中, 字典会同时使用 ht[0] 和 ht[1] 两个哈希表, 那这期间的操作如何保证正常进行呢?
(1)查询一个信息
这个类似于我们的数据库信息等迁移,先查询一个库,没有的话,再去查询另一个库。
ht[0] 中没找到,我们去 ht[1] 中查询即可。
(2)新数据怎么办?
这个和数据迁移一样的道理。
当我们有新旧的两个系统时,新来的用户等信息直接落在新系统即可,
这一措施保证了 ht[0] 包含的键值对数量会只减不增, 并随着 rehash 操作的执行而最终变成空表。
扩容 1.什么时候判断? redis 在每次执行 put 操作的时候,就可以检查是否需要扩容。
其实也很好理解,put 插入元素的时候,判断是否需要扩容,然后开始扩容,是直接的一种思路。
留一个思考题:我们可以在其他的时候判断吗?
2.redis 判断是否需要扩容的源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static  int  _dictExpandIfNeeded (dict *d)  {              if  (dictIsRehashing(d)) return  DICT_OK;               if  (d->ht[0 ].size == 0 ) return  dictExpand(d, DICT_HT_INITIAL_SIZE);               if  (d->ht[0 ].used >= d->ht[0 ].size &&         (dict_can_resize ||          d->ht[0 ].used/d->ht[0 ].size > dict_force_resize_ratio))     {         return  dictExpand(d, d->ht[0 ].used*2 );     }     return  DICT_OK; } 
 
扩容的条件总结下来就是两句话:
(1)服务器目前没有在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1;
(2)服务器目前正在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5;
这里其实体现了作者的一种设计思想:如果负载因子超过5,说明信息已经很多了,管你在不在保存,都要执行扩容,优先保证服务可用性。如果没那么高,那就等持久化完成再做 rehash。
我们自己在实现的时候可以简化一下,比如只考虑情况2。
3.扩容到原来的多少? 知道了什么时候应该开始扩容,但是要扩容到多大也是值得思考的一个问题。
扩容的太小,会导致频繁扩容,浪费性能。
扩容的太大,会导致资源的浪费。
其实这个最好的方案是结合我们实际的业务,不过这部分对用户是透明的。
一般是扩容为原来的两倍。
4.为什么需要扩容? 我们在实现 ArrayList 的时候需要扩容,因为数据放不下了。
我们知道 HashMap 的底层是数组 + 链表(红黑树)的数据结构。
那么会存在放不下的情况吗?
个人理解实际上不会。因为链表可以一直加下去。
那为什么需要扩容呢?
实际上更多的是处于性能的考虑。我们使用 HashMap 就是为了提升性能,如果一直不扩容,可以理解为元素都 hash 到相同的 bucket 上,这时就退化成了一个链表。
这会导致查询等操作性能大大降低。
缩容 1.什么时候判断? 看了前面的扩容,我们比较直观地方式是在用户 remove 元素的时候执行是否需要缩容。
不过 redis 并不完全等同于传统的 HashMap,还有数据的淘汰和过期,这些是对用户透明的。
redis 采用的方式实际上是一个定时任务。
个人理解内存缩容很重要,但是没有那么紧急,我们可以 1min 扫描一次,这样可以节省机器资源。
实际工作中,一般 redis 的内存都是逐步上升的,或者稳定在一个范围内,很少去大批量删除数据。(除非数据搞错了,我就遇到过一次,数据同步错地方了)。
所以数据删除,一般几分钟内给用户一个反馈就行。
知其然,知其所以然。
我们懂得了这个道理也就懂得了为什么有时候删除 redis 的几百万 keys,内存也不是直接降下来的原因。
2.缩容的条件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void  tryResizeHashTables (int  dbid)   {    if  (htNeedsResize(server.db[dbid].dict))         dictResize(server.db[dbid].dict);     if  (htNeedsResize(server.db[dbid].expires))         dictResize(server.db[dbid].expires); } #define HASHTABLE_MIN_FILL        10      /* Minimal hash table fill 10% */ int  htNeedsResize (dict *dict)   {    long  long  size, used;     size = dictSlots(dict);     used = dictSize(dict);     return  (size > DICT_HT_INITIAL_SIZE &&             (used*100 /size < HASHTABLE_MIN_FILL)); } int  dictResize (dict *d)  {    int  minimal;     if  (!dict_can_resize || dictIsRehashing(d)) return  DICT_ERR;     minimal = d->ht[0 ].used;     if  (minimal < DICT_HT_INITIAL_SIZE)         minimal = DICT_HT_INITIAL_SIZE;     return  dictExpand(d, minimal); } 
 
和扩容类似,不过这里的缩容比例不是 5 倍,而是当哈希表保存的key数量与哈希表的大小的比例小于 10% 时需要缩容。
3.缩容到多少? 最简单的方式是直接变为原来的一半,不过这么做有时候也不是那么好用。
redis 是缩容后的大小为第一个大于等于当前key数量的2的n次方。 
这个可能不太好理解,举几个数字就懂了:
keys数量 
缩容大小 
 
 
3 
4 
 
4 
4 
 
5 
8 
 
9 
16 
 
主要保障以下3点:
(1)缩容之后,要大于等于 key 的数量
(2)尽可能的小,节约内存
(3)2 的倍数。
第三个看过 HashMap 源码讲解的小伙伴应该深有体会。
当然也不能太小,redis 限制的最小为 4。
实际上如果 redis 中只放 4 个 key,实在是杀鸡用牛刀,一般不会这么小。
我们在实现的时候,直接参考 jdk 好了,给个最小值限制 8。
4.为什么需要缩容? 最核心的目的就是为了节约内存,其实还有一个原因,叫 small means fast(小即是快——老马)。
渐进式 ReHash 实现的思考 好了,扩容和缩容就聊到这里,那么这个渐进式 rehash 到底怎么一个渐进法?
1.扩容前 不需要扩容时应该有至少需要初始化两个元素:
1 2 3 4 hashtable[0 ] = new  HashTable(size); hashIndex=-1 ; hashtable[1 ] = null ; 
 
hashtable 中存储着当前的元素信息,hashIndex=-1 标识当前没有在进行扩容。
2.扩容准备 当需要扩容的时候,我们再去创建一个 hashtable[1],并且 size 是原来的 2倍。
1 2 3 4 5 hashtable[0 ] = new  HashTable(size); hashtable[1 ] = new  HashTable(2  * size); hashIndex=-1 ; 
 
主要是为了节约内存,使用惰性初始化的方式创建 hashtable。
3.扩容时 调整 hashIndex=0…size,逐步去 rehash 到新的 hashtable[1]
新的插入全部放入到 hashtable[1]
4.扩容后 扩容后我们应该把 hashtable[0] 的值更新为 hashtable[1],并且释放掉 hashtable[1] 的资源。
并且设置 hashIndex=-1,标识已经 rehash 完成
1 2 3 4 hashtable[0 ] = hashtable[1 ]; hashIndex=-1 ; hashtable[1 ] = null ; 
 
这样整体的实现思路就已经差不多了,光说不练假把式,我们下一节就来自己实现一个渐进式 rehash 的 HashMap
十.实现渐进式ReHash Map类定义 类定义 1 2 3 4 5 6 7 8 public  class  MyProgressiveReHashMap <K ,V > extends  AbstractMap <K ,V > implements  Map <K ,V >  {} 
 
和简易版本类似。
私有变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private  static  final  Log log = LogFactory.getLog(MyProgressiveReHashMap.class);private  int  rehashIndex = -1 ;private  int  capacity;private  int  rehashCapacity;private  int  size = 0 ;private  final  double  factor = 1.0 ;private  List<List<Entry<K, V>>> table;private  List<List<Entry<K, V>>> rehashTable;private  boolean  debugMode = false ;
 
rehashIndex/rehashCapacity/rehashTable 这三个值都是我们在进行渐进式实现的时候需要使用的值。
构造器 主要是一些值的初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public  MyProgressiveReHashMap ()   {    this (8 ); } public  MyProgressiveReHashMap (int  capacity)   {    this (capacity, false ); } public  MyProgressiveReHashMap (int  capacity, boolean  debugMode)   {    this .capacity = capacity;          this .table = new  ArrayList<>(capacity);          for (int  i = 0 ; i < capacity; i++) {         this .table.add(i, new  ArrayList<Entry<K, V>>());     }     this .debugMode = debugMode;     this .rehashIndex = -1 ;     this .rehashCapacity = -1 ;     this .rehashTable = null ; } 
 
put() 方法 这个方法相对难度比较大:
put() 的过程可以见方法的注释。
需要考虑是否为 rehash 阶段,还需要考虑是否为更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @Override public  V put (K key, V value)   {    boolean  isInRehash = isInReHash();     if (!isInRehash) {                  Pair<Boolean, V> pair = updateTableInfo(key, value, this .table, this .capacity);         if (pair.getValueOne()) {             V oldVal = pair.getValueTwo();             if (debugMode) {                 log.debug("不处于渐进式 rehash,此次为更新操作。key: {}, value: {}" , key, value);                 printTable(this .table);             }             return  oldVal;         } else  {                          return  this .createNewEntry(key, value);         }     } else  {                  if (debugMode) {             log.debug("当前处于渐进式 rehash 阶段,额外执行一次渐进式 rehash 的动作" );         }         rehashToNew();                  Pair<Boolean, V> pair = updateTableInfo(key, value, this .table, this .capacity);         if (pair.getValueOne()) {             V oldVal = pair.getValueTwo();             if (debugMode) {                 log.debug("此次为更新 table 操作。key: {}, value: {}" , key, value);                 printTable(this .table);             }             return  oldVal;         }                  Pair<Boolean, V> pair2 = updateTableInfo(key, value, this .rehashTable, this .rehashCapacity);         if (pair2.getValueOne()) {             V oldVal = pair2.getValueTwo();             if (debugMode) {                 log.debug("此次为更新 rehashTable 操作。key: {}, value: {}" , key, value);                 printTable(this .table);             }             return  oldVal;         }                  return  this .createNewEntry(key, value);     } } 
 
1.是否为 rehash 阶段 这个实现比较简单,就是判断 rehashIndex 是否为 -1:
1 2 3 4 5 6 7 private  boolean  isInReHash ()   {    return  rehashIndex != -1 ; } 
 
2.更新列表信息 这里为了复用,对方法进行了抽象。可以同时使用到 table 和 rehashTable 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private  Pair<Boolean, V> updateTableInfo (K key, V value, final  List<List<Entry<K,V>>> table,                              final  int  tableCapacity)   {         int  hash = HashUtil.hash(key);     int  index = HashUtil.indexFor(hash, tableCapacity);          List<Entry<K,V>> entryList = new  ArrayList<>();     if (index < table.size()) {         entryList = table.get(index);     }          for (Entry<K,V> entry : entryList) {                  final  K entryKey = entry.getKey();         if (ObjectUtil.isNull(key, entryKey)                 || key.equals(entryKey)) {             final  V oldValue = entry.getValue();                          entry.setValue(value);             return  Pair.of(true , oldValue);         }     }     return  Pair.of(false , null ); } 
 
这个和以前基本是类似的。
返回结果时,为了同时保存是否为更新,以及更新的 value 值。所以使用了 Pair 工具类。
3.插入新的元素 插入方法也比较麻烦,需要区分是否处于渐进式 rehash 阶段。还要考虑是否需要扩容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private  V createNewEntry (final  K key,                             final  V value)   {    Entry<K,V> entry = new  DefaultMapEntry<>(key, value);          int  hash = HashUtil.hash(key);          if (isInReHash()) {         int  index = HashUtil.indexFor(hash, this .rehashCapacity);         List<Entry<K,V>> list = this .rehashTable.get(index);         list.add(entry);         if (debugMode) {             log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。" );             printTable(this .rehashTable);         }     }                    if (isNeedExpand()) {         rehash();                  int  index = HashUtil.indexFor(hash, this .rehashCapacity);         List<Entry<K,V>> list = this .rehashTable.get(index);         list.add(entry);         if (debugMode) {             log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。" );             printTable(this .rehashTable);         }     } else  {         int  index = HashUtil.indexFor(hash, this .capacity);         List<Entry<K,V>> list = this .table.get(index);         list.add(entry);         if (debugMode) {             log.debug("目前不处于 rehash 中,元素直接插入到 table 中。" );             printTable(this .table);         }     }     this .size++;     return  value; } 
 
是否需要扩容的方法也比较简单:
1 2 3 4 5 6 7 8 9 10 11 private  boolean  isNeedExpand ()   {         double  rate = size*1.0  / capacity*1.0 ;     return  rate >= factor && !isInReHash(); } 
 
不过我们这次添加了一个不要处于渐进式 rehash 过程中。
其中 rehash 的实现也发生了很大的变化,具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private  void  rehash ()   {    if (isInReHash()) {         if (debugMode) {             log.debug("当前处于渐进式 rehash 阶段,不重复进行 rehash!" );         }         return ;     }          this .rehashIndex = -1 ;     this .rehashCapacity = 2 *capacity;     this .rehashTable = new  ArrayList<>(this .rehashCapacity);     for (int  i = 0 ; i < rehashCapacity; i++) {         rehashTable.add(i, new  ArrayList<Entry<K, V>>());     }          rehashToNew(); } 
 
渐进式更新的方法,可以在 get/put/remove 等操作时,执行附加操作时使用。
所以单独抽成了一个方法,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private  void  rehashToNew ()   {    rehashIndex++;     List<Entry<K, V>> list = table.get(rehashIndex);     for (Entry<K, V> entry : list) {         int  hash = HashUtil.hash(entry);         int  index = HashUtil.indexFor(hash, rehashCapacity);                           List<Entry<K,V>> newList = rehashTable.get(index);                           newList.add(entry);         rehashTable.set(index, newList);     }          table.set(rehashIndex, new  ArrayList<Entry<K, V>>());               if (rehashIndex == (table.size()-1 )) {         this .capacity = this .rehashCapacity;         this .table = this .rehashTable;         this .rehashIndex = -1 ;         this .rehashCapacity = -1 ;         this .rehashTable = null ;         if (debugMode) {             log.debug("渐进式 rehash 已经完成。" );             printTable(this .table);         }p     } else  {         if (debugMode) {             log.debug("渐进式 rehash 处理中, 目前 index:{} 已完成" , rehashIndex);             printAllTable();         }     } } 
 
get() 操作 渐进式 rehash 将动作分散到每一个操作中,我们对 get 方法进行重写,当做一个例子。其他的方法如果实现也是类似的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public  V get (Object key)   {    if (isInReHash()) {         if (debugMode) {             log.debug("当前处于渐进式 rehash 状态,额外执行一次操作" );             rehashToNew();         }     }          V result = getValue(key, this .table);     if (result != null ) {         return  result;     }          if (isInReHash()) {         return  getValue(key, this .rehashTable);     }     return  null ; } 
 
测试 我们历经千辛万苦,终于实现了一个简单版本的渐进式 hash map。
下面来测试一下功能是否符合我们的预期。
1.put 操作 1 2 3 Map<String, String> map = new  MyProgressiveReHashMap<>(2 , true ); map.put("1" , "1" ); map.put("1" , "2" ); 
 
日志:
1 2 3 4 [DEBUG] [2020 -10 -11  21 :30 :15.072 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。 {1 : 1 }  [DEBUG] [2020 -10 -11  21 :30 :15.076 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.put] - 不处于渐进式 rehash,此次为更新操作。key: 1 , value: 2  {1 : 2 }  
 
第一次是插入,第二次是更新。
这里都没有触发扩容,下面我们看一下触发扩容的情况。
2.扩容测试 1 2 3 4 5 6 7 8 Map<String, String> map = new  MyProgressiveReHashMap<>(2 , true ); map.put("1" , "1" ); map.put("2" , "2" ); map.put("3" , "3" ); Assert.assertEquals("1" , map.get("1" )); Assert.assertEquals("2" , map.get("2" )); Assert.assertEquals("3" , map.get("3" )); 
 
日志如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 [DEBUG] [2020 -10 -11  21 :31 :12.559 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。 {1 : 1 }  [DEBUG] [2020 -10 -11  21 :31 :12.560 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。 {2 : 2 }  {1 : 1 }  [DEBUG] [2020 -10 -11  21 :31 :12.563 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 处理中, 目前 index:0  已完成 原始 table 信息:  {1 : 1 }  新的 table 信息:  {2 : 2 }  [DEBUG] [2020 -10 -11  21 :31 :12.563 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前处于 rehash 中,元素直接插入到 rehashTable 中。 {2 : 2 }  {3 : 3 }  [DEBUG] [2020 -10 -11  21 :31 :12.564 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.get] - 当前处于渐进式 rehash 状态,额外执行一次操作 [DEBUG] [2020 -10 -11  21 :31 :12.564 ] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 已经完成。 {2 : 2 }  {1 : 1 }  {3 : 3 }  
 
当放入元素【3】的时候,已经触发了 rehash。
(1)第一次渐进式 rehash 将 table[0] 的元素 rehash 到了新的节点。
(2)插入的元素直接插入到 rehashTable 中
(3)get 操作时,额外触发一次 rehash,然后所有的 rehash 已经完成。
参考资料 Fluent Interface 流式接口