0%

Java|渐进式本地缓存开发总结

img

一.固定大小缓存实现

缓存接口定义

为了兼容 Map,我们定义缓存接口继承自 Map 接口。

1
2
3
4
5
/**
* 缓存接口
*/
public interface ICache<K, V> extends Map<K, V> {
}

核心实现

我们主要看一下 put 时的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public V put(K key, V value) {
//1.1 尝试驱除
CacheEvictContext<K,V> context = new CacheEvictContext<>();
context.key(key).size(sizeLimit).cache(this);
// context包含新的键和旧的缓存,放入驱除类中按规则淘汰
cacheEvict.evict(context);
//2. 判断驱除后的信息
if(isSizeLimit()) {
throw new CacheRuntimeException("当前队列已满,数据添加失败!");
}
//3. 执行添加
return map.put(key, value);
}

这里我们可以让用户动态指定大小,但是指定大小肯就要有对应的淘汰策略。

否则,固定大小的 map 肯定无法放入元素。

淘汰策略

淘汰策略可以有多种,比如 LRU/LFU/FIFO 等等,我们此处实现一个最基本的 FIFO。

所有实现以接口的方式实现,便于后期灵活替换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CacheEvictFIFO<K,V> implements ICacheEvict<K,V> {

/**
* queue 信息
*/
private Queue<K> queue = new LinkedList<>();

@Override
public void evict(ICacheEvictContext<K, V> context) {
final ICache<K,V> cache = context.cache();
// 超过限制,执行移除
if(cache.size() >= context.size()) {
K evictKey = queue.remove();
// 移除最开始的元素
cache.remove(evictKey);
}

// 将新加的元素放入队尾
final K key = context.key();
queue.add(key);
}

}

FIFO 比较简单,我们使用一个队列,存储每一次放入的元素,当队列超过最大限制时,删除最早的元素。

引导类

为了便于用户使用,我们实现类似于 guava 的引导类。

所有参数都提供默认值,使用 fluent 流式写法,提升用户体验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* 缓存引导类
*/
public final class CacheBs<K,V> {

private CacheBs(){}

/**
* 创建对象实例
* @param <K> key
* @param <V> value
* @return this
*/
public static <K,V> CacheBs<K,V> newInstance() {
return new CacheBs<>();
}

/**
* map 实现
*/
private Map<K,V> map = new HashMap<>();

/**
* 大小限制
*/
private int size = Integer.MAX_VALUE;

/**
* 驱除策略默认设置
*/
private ICacheEvict<K,V> evict = CacheEvicts.fifo();

/**
* map 实现
* @param map map
* @return this
*/
public CacheBs<K, V> map(Map<K, V> map) {
ArgUtil.notNull(map, "map");

this.map = map;
return this;
}

/**
* 设置 size 信息
* @param size size
* @return this
*/
public CacheBs<K, V> size(int size) {
ArgUtil.notNegative(size, "size");

this.size = size;
return this;
}

/**
* 设置驱除策略
* @param evict 驱除策略
* @since 0.0.2
*/
public CacheBs<K, V> evict(ICacheEvict<K, V> evict) {
this.evict = evict;
return this;
}

/**
* 构建缓存信息
* @return 缓存信息
*/
public ICache<K,V> build() {
CacheContext<K,V> context = new CacheContext<>();
context.cacheEvict(evict);
context.map(map);
context.size(size);

return new Cache<>(context);
}

}

测试使用

1
2
3
4
5
6
7
8
9
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(2)
.build();
cache.put("1", "1");
cache.put("2", "2");
cache.put("3", "3");
cache.put("4", "4");
Assert.assertEquals(2, cache.size());
System.out.println(cache.keySet());

默认为先进先出的策略,此时输出 keys,内容如下:

1
[3, 4]

二.LRU缓存淘汰策略

上面默认使用FIFO淘汰策略即先进先淘汰,下我们来开发LRU缓存淘汰策略即淘汰最近最少使用

LRU基本原理

1.LRU 是什么

LRU 是由 Least Recently Used 的首字母组成,表示最近最少使用的含义,一般使用在对象淘汰算法上。

也是比较常见的一种淘汰算法。

其核心思想是如果数据最近被访问过,那么将来被访问的几率也更高

2.连续性

在计算机科学中,有一个指导准则:连续性准则。

时间连续性:对于信息的访问,最近被访问过,被再次访问的可能性会很高。缓存就是基于这个理念进行数据淘汰的。

空间连续性:对于磁盘信息的访问,将很有可能访问连续的空间信息。所以会有 page 预取来提升性能。

3.实现步骤
  1. 新数据插入到链表头部;
  2. 每当缓存命中(即缓存数据被访问),则将数据移到链表头部;
  3. 当链表满的时候,将链表尾部的数据丢弃。

其实比较简单,比起 FIFO 的队列,我们引入一个链表实现即可。

5.LRU实现步骤疑点

我们针对上面的 3 句话,逐句考虑一下,看看有没有值得优化点或者一些坑。

如何判断是新数据?

新数据插入到链表头部;

我们使用的是链表。

判断新数据最简单的方法就是遍历是否存在,对于链表,这是一个 O(n) 的时间复杂度。

其实性能还是比较差的。

当然也可以考虑空间换时间,比如引入一个 set 之类的,不过这样对空间的压力会加倍。

什么是缓存命中

每当缓存命中(即缓存数据被访问),则将数据移到链表头部;

put(key,value) 的情况,就是新元素。如果已有这个元素,可以先删除,再加入,参考上面的处理。

get(key) 的情况,对于元素访问,删除已有的元素,将新元素放在头部。

remove(key) 移除一个元素,直接删除已有元素。

keySet() valueSet() entrySet() 这些属于无差别访问,我们不对队列做调整。

移除

当链表满的时候,将链表尾部的数据丢弃。

链表满只有一种场景,那就是添加元素的时候,也就是执行 put(key, value) 的时候。

直接删除对应的 key 即可。

Java 代码实现

1.接口定义

和 FIFO 的接口保持一致,调用地方也不变

为了后续 LRU/LFU 实现,新增 remove/update 两个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface ICacheEvict<K, V> {

/**
* 驱除策略
*
* @param context 上下文
* @return 是否执行驱除
*/
boolean evict(final ICacheEvictContext<K, V> context);

/**
* 更新 key 信息
* @param key key
*/
void update(final K key);

/**
* 删除 key 信息
* @param key key
*/
void remove(final K key);

}
2.LRU 实现

直接基于 LinkedList 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 丢弃策略-LRU 最近最少使用
*/
public class CacheEvictLRU<K,V> implements ICacheEvict<K,V> {

private static final Log log = LogFactory.getLog(CacheEvictLRU.class);

/**
* list 信息
*/
private final List<K> list = new LinkedList<>();

@Override
public boolean evict(ICacheEvictContext<K, V> context) {
boolean result = false;
final ICache<K,V> cache = context.cache();
// 超过限制,移除队尾的元素
if(cache.size() >= context.size()) {
K evictKey = list.get(list.size()-1);
// 移除对应的元素
cache.remove(evictKey);
result = true;
}
return result;
}


/**
* 放入元素
* (1)删除已经存在的
* (2)新元素放到元素头部
*
* @param key 元素
*/
@Override
public void update(final K key) {
this.list.remove(key);
this.list.add(0, key);
}

/**
* 移除元素
* @param key 元素
*/
@Override
public void remove(final K key) {
this.list.remove(key);
}

}

实现比较简单,相对 FIFO 多了三个方法:

update():我们做一点简化,认为只要是访问,就是删除,然后插入到队首。

remove():删除就是直接删除。

这三个方法是用来更新最近使用情况的。

那什么时候调用呢?

3.注解属性

为了保证核心流程,我们基于注解实现。

添加属性:

1
2
3
4
5
6
7
/**
* 是否执行驱除更新
*
* 主要用于 LRU/LFU 等驱除策略
* @return 是否
*/
boolean evict() default false;
4.注解使用

有哪些方法需要使用?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
@CacheInterceptor(refresh = true, evict = true)
public boolean containsKey(Object key) {
return map.containsKey(key);
}

@Override
@CacheInterceptor(evict = true)
@SuppressWarnings("unchecked")
public V get(Object key) {
//1. 刷新所有过期信息
K genericKey = (K) key;
this.expire.refreshExpire(Collections.singletonList(genericKey));
return map.get(key);
}

@Override
@CacheInterceptor(aof = true, evict = true)
public V put(K key, V value) {
//...
}

@Override
@CacheInterceptor(aof = true, evict = true)
public V remove(Object key) {
return map.remove(key);
}
5.注解驱除拦截器实现

执行顺序:放在方法之后更新,不然每次当前操作的 key 都会被放在最前面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 驱除策略拦截器
*/
public class CacheInterceptorEvict<K,V> implements ICacheInterceptor<K, V> {

private static final Log log = LogFactory.getLog(CacheInterceptorEvict.class);

@Override
public void before(ICacheInterceptorContext<K,V> context) {
}

@Override
@SuppressWarnings("all")
public void after(ICacheInterceptorContext<K,V> context) {
ICacheEvict<K,V> evict = context.cache().evict();

Method method = context.method();
final K key = (K) context.params()[0];
if("remove".equals(method.getName())) {
evict.remove(key);
} else {
evict.update(key);
}
}

}

我们只对 remove 方法做下特判,其他方法都使用 update 更新信息。

参数直接取第一个参数。

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(3)
.evict(CacheEvicts.<String, String>lru())
.build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");

// 访问一次A
cache.get("A");
cache.put("D", "LRU");
Assert.assertEquals(3, cache.size());

System.out.println(cache.keySet());
  • 日志信息
1
[D, A, C]

通过 removeListener 日志也可以看到 B 被移除了:

1
[DEBUG] [2020-10-02 21:33:44.578] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key:

三.LRU缓存策略的优化

数据结构选择

1.基于数组

方案:为每一个数据附加一个额外的属性——时间戳,当每一次访问数据时,更新该数据的时间戳至当前时间。

当数据空间已满后,则扫描整个数组,淘汰时间戳最小的数据。

不足:维护时间戳需要耗费额外的空间,淘汰数据时需要扫描整个数组。

这个时间复杂度太差,空间复杂度也不好。

2.基于长度有限的双向链表

方案:访问一个数据时,当数据不在链表中,则将数据插入至链表头部,如果在链表中,则将该数据移至链表头部。当数据空间已满后,则淘汰链表最末尾的数据。

不足:插入数据或取数据时,需要扫描整个链表。

这个就是我们上一节实现的方式,缺点还是很明显,每次确认元素是否存在,都要消耗 O(n) 的时间复杂度去查询。

3.基于双向链表和哈希表

方案:为了改进上面需要扫描链表的缺陷,配合哈希表,将数据和链表中的节点形成映射,将插入操作和读取操作的时间复杂度从O(N)降至O(1)

缺点:这个使我们上一节提到的优化思路,不过还是有缺点的,那就是空间复杂度翻倍。

4.数据结构的选择总结

(1)基于数组的实现

这里不建议选择 array 或者 ArrayList,因为读取的时间复杂度为 O(1),但是更新相对是比较慢的,虽然 jdk 使用的是 System.arrayCopy。

(2)基于双向链表的实现

如果我们选择链表,HashMap 中还是不能简单的存储 key, 和对应的下标。

因为链表的遍历,实际上还是 O(n) 的,双向链表理论上可以优化一半,但是这并不是我们想要的 O(1) 效果。

(3)基于双向链表 + Map实现

双向链表我们保持不变。

Map 中 key 对应的值我们放双向链表的节点信息。

那实现方式就变成了实现一个双向链表。

基于自定义双向链表实现

1.节点定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 双向链表节点
* @param <K> key
* @param <V> value
*/
public class DoubleListNode<K,V> {

/**
* 键
*/
private K key;

/**
* 值
*/
private V value;

/**
* 前一个节点
*/
private DoubleListNode<K,V> pre;

/**
* 后一个节点
*/
private DoubleListNode<K,V> next;

//fluent get & set
}
2.核心代码实现

我们保持和原来的接口不变,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class CacheEvictLruDoubleListMap<K,V> extends AbstractCacheEvict<K,V> {

private static final Log log = LogFactory.getLog(CacheEvictLruDoubleListMap.class);


/**
* 头结点
*/
private DoubleListNode<K,V> head;

/**
* 尾巴结点
*/
private DoubleListNode<K,V> tail;

/**
* map 信息
* key: 元素信息
* value: 元素在 list 中对应的节点信息
*/
private Map<K, DoubleListNode<K,V>> indexMap;

public CacheEvictLruDoubleListMap() {
this.indexMap = new HashMap<>();
this.head = new DoubleListNode<>();
this.tail = new DoubleListNode<>();

this.head.next(this.tail);
this.tail.pre(this.head);
}

@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
ICacheEntry<K, V> result = null;
final ICache<K,V> cache = context.cache();
// 超过限制,移除队尾的元素
if(cache.size() >= context.size()) {
// 获取尾巴节点的前一个元素
DoubleListNode<K,V> tailPre = this.tail.pre();
if(tailPre == this.head) {
log.error("当前列表为空,无法进行删除");
throw new CacheRuntimeException("不可删除头结点!");
}

K evictKey = tailPre.key();
V evictValue = cache.remove(evictKey);
result = new CacheEntry<>(evictKey, evictValue);
}

return result;
}


/**
* 放入元素
*
* (1)删除已经存在的
* (2)新元素放到元素头部
* @param key 元素
*/
@Override
public void update(final K key) {
//1. 执行删除
this.remove(key);

//2. 新元素插入到头部
//head<->next
//变成:head<->new<->next
DoubleListNode<K,V> newNode = new DoubleListNode<>();
newNode.key(key);

DoubleListNode<K,V> next = this.head.next();
this.head.next(newNode);
newNode.pre(this.head);
next.pre(newNode);
newNode.next(next);

//2.2 插入到 map 中
indexMap.put(key, newNode);
}

/**
* 移除元素
*
* 1. 获取 map 中的元素
* 2. 不存在直接返回,存在执行以下步骤:
* 2.1 删除双向链表中的元素
* 2.2 删除 map 中的元素
* @param key 元素
*/
@Override
public void remove(final K key) {
DoubleListNode<K,V> node = indexMap.get(key);

if(ObjectUtil.isNull(node)) {
return;
}

// 删除 list node
// A<->B<->C
// 删除 B,需要变成:A<->C
DoubleListNode<K,V> pre = node.pre();
DoubleListNode<K,V> next = node.next();

pre.next(next);
next.pre(pre);

// 删除 map 中对应信息
this.indexMap.remove(key);
}

}

实现起来不难,就是一个简易版本的双向列表。

只是获取节点的时候,借助了一下 map,让时间复杂度降低为 O(1)。

3.测试结果

我们验证一下自己的实现:

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(3)
.evict(CacheEvicts.<String, String>lruDoubleListMap())
.build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");

// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

日志

1
2
[DEBUG] [2020-10-03 09:37:41.007] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[D, A, C]

因为我们访问过一次 A,所以 B 已经变成最少被访问的元素。

基于 LinkedHashMap 实现

实际上,LinkedHashMap 本身就是对于 list 和 hashMap 的一种结合的数据结构,我们可以直接使用 jdk 中 LinkedHashMap 去实现。

1.直接实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LRUCache extends LinkedHashMap {

private int capacity;

public LRUCache(int capacity) {
// 注意这里将LinkedHashMap的accessOrder设为true
super(16, 0.75f, true);
this.capacity = capacity;
}

@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return super.size() >= capacity;
}
}

默认LinkedHashMap并不会淘汰数据,所以我们重写了它的removeEldestEntry()方法,当数据数量达到预设上限后,淘汰数据,accessOrder设为true意为按照访问的顺序排序。

整个实现的代码量并不大,主要都是应用LinkedHashMap的特性。

2.简单改造

我们对这个方法简单改造下,让其适应我们定义的接口。

3.测试结果

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(3)
.evict(CacheEvicts.<String, String>lruLinkedHashMap())
.build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");
// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

日志

1
2
[DEBUG] [2020-10-03 10:20:57.842] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[D, A, C]

LRU扩展算法

当存在热点数据时,LRU的效率很好,但偶发性的、周期性的批量操作会导致LRU命中率急剧下降,缓存污染情况比较严重。

1. LRU-K

LRU-K中的K代表最近使用的次数,因此LRU可以认为是LRU-1。

LRU-K的主要目的是为了解决LRU算法“缓存污染”的问题,其核心思想是将“最近使用过1次”的判断标准扩展为“最近使用过K次”。

相比LRU,LRU-K需要多维护一个队列,用于记录所有缓存数据被访问的历史。只有当数据的访问次数达到K次的时候,才将数据放入缓存。

当需要淘汰数据时,LRU-K会淘汰第K次访问时间距当前时间最大的数据。

数据第一次被访问时,加入到历史访问列表,如果数据在访问历史列表中没有达到K次访问,则按照一定的规则(FIFO,LRU)淘汰;

当访问历史队列中的数据访问次数达到K次后,将数据索引从历史队列中删除,将数据移到缓存队列中,并缓存数据,缓存队列重新按照时间排序;

缓存数据队列中被再次访问后,重新排序,需要淘汰数据时,淘汰缓存队列中排在末尾的数据,即“淘汰倒数K次访问离现在最久的数据”。

LRU-K具有LRU的优点,同时还能避免LRU的缺点,实际应用中LRU-2是综合最优的选择。

由于LRU-K还需要记录那些被访问过、但还没有放入缓存的对象,因此内存消耗会比LRU要多。

2. two queue

Two queues(以下使用2Q代替)算法类似于LRU-2,不同点在于2Q将LRU-2算法中的访问历史队列(注意这不是缓存数据的)改为一个FIFO缓存队列,即:2Q算法有两个缓存队列,一个是FIFO队列,一个是LRU队列。

当数据第一次访问时,2Q算法将数据缓存在FIFO队列里面,当数据第二次被访问时,则将数据从FIFO队列移到LRU队列里面,两个队列各自按照自己的方法淘汰数据。

新访问的数据插入到FIFO队列中,如果数据在FIFO队列中一直没有被再次访问,则最终按照FIFO规则淘汰;

如果数据在FIFO队列中再次被访问到,则将数据移到LRU队列头部,如果数据在LRU队列中再次被访问,则将数据移动LRU队列头部,LRU队列淘汰末尾的数据。

3. Multi Queue(MQ)

MQ算法根据访问频率将数据划分为多个队列,不同的队列具有不同的访问优先级,其核心思想是:优先缓存访问次数多的数据

详细的算法结构图如下,Q0,Q1….Qk代表不同的优先级队列,Q-history代表从缓存中淘汰数据,但记录了数据的索引和引用次数的队列:

新插入的数据放入Q0,每个队列按照LRU进行管理,当数据的访问次数达到一定次数,需要提升优先级时,将数据从当前队列中删除,加入到高一级队列的头部;为了防止高优先级数据永远不会被淘汰,当数据在指定的时间里没有被访问时,需要降低优先级,将数据从当前队列删除,加入到低一级的队列头部;需要淘汰数据时,从最低一级队列开始按照LRU淘汰,每个队列淘汰数据时,将数据从缓存中删除,将数据索引加入Q-history头部。

如果数据在Q-history中被重新访问,则重新计算其优先级,移到目标队列头部。

Q-history按照LRU淘汰数据的索引。

MQ需要维护多个队列,且需要维护每个数据的访问时间,复杂度比LRU高。

4.LRU算法对比
对比点 对比
命中率 LRU-2 > MQ(2) > 2Q > LRU
复杂度 LRU-2 > MQ(2) > 2Q > LRU
代价 LRU-2 > MQ(2) > 2Q > LRU

实际上上面的几个算法,思想上大同小异。

核心目的:解决批量操作导致热点数据失效,缓存被污染的问题。

实现方式:增加一个队列,用来保存只访问一次的数据,然后根据次数不同,放入到 LRU 中。

只访问一次的队列,可以是 FIFO 队列,可以是 LRU,我们来实现一下 2Q 和 LRU-2 两种实现。

2Q算法实现

1.实现思路

实际上就是我们以前的 FIFO + LRU 二者的结合。

2.基本属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class CacheEvictLru2Q<K,V> extends AbstractCacheEvict<K,V> {

private static final Log log = LogFactory.getLog(CacheEvictLru2Q.class);

/**
* 队列大小限制
*
* 降低 O(n) 的消耗,避免耗时过长。
*/
private static final int LIMIT_QUEUE_SIZE = 1024;

/**
* 第一次访问的队列
*/
private Queue<K> firstQueue;

/**
* 头结点
*/
private DoubleListNode<K,V> head;

/**
* 尾巴结点
*/
private DoubleListNode<K,V> tail;

/**
* map 信息
*
* key: 元素信息
* value: 元素在 list 中对应的节点信息
*/
private Map<K, DoubleListNode<K,V>> lruIndexMap;

public CacheEvictLru2Q() {
this.firstQueue = new LinkedList<>();
this.lruIndexMap = new HashMap<>();
this.head = new DoubleListNode<>();
this.tail = new DoubleListNode<>();

this.head.next(this.tail);
this.tail.pre(this.head);
}

}
3.数据淘汰

数据淘汰的逻辑:

当缓存大小,已经达到最大限制时执行:

(1)优先淘汰 firstQueue 中的数据

(2)如果 firstQueue 中数据为空,则淘汰 lruMap 中的数据信息。

这里有一个假设:我们认为被多次访问的数据,重要性高于被只访问了一次的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
ICacheEntry<K, V> result = null;
final ICache<K,V> cache = context.cache();
// 超过限制,移除队尾的元素
if(cache.size() >= context.size()) {
K evictKey = null;
//1. firstQueue 不为空,优先移除队列中元素
if(!firstQueue.isEmpty()) {
evictKey = firstQueue.remove();
} else {
// 获取尾巴节点的前一个元素
DoubleListNode<K,V> tailPre = this.tail.pre();
if(tailPre == this.head) {
log.error("当前列表为空,无法进行删除");
throw new CacheRuntimeException("不可删除头结点!");
}
evictKey = tailPre.key();
}
// 执行移除操作
V evictValue = cache.remove(evictKey);
result = new CacheEntry<>(evictKey, evictValue);
}
return result;
}
4.数据删除

当数据被删除时调用:

这个逻辑和以前类似,只是多了一个 FIFO 队列的移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 移除元素
*
* 1. 获取 map 中的元素
* 2. 不存在直接返回,存在执行以下步骤:
* 2.1 删除双向链表中的元素
* 2.2 删除 map 中的元素
*
* @param key 元素
*/
@Override
public void removeKey(final K key) {
DoubleListNode<K,V> node = lruIndexMap.get(key);
//1. LRU 删除逻辑
if(ObjectUtil.isNotNull(node)) {
// A<->B<->C
// 删除 B,需要变成:A<->C
DoubleListNode<K,V> pre = node.pre();
DoubleListNode<K,V> next = node.next();
pre.next(next);
next.pre(pre);
// 删除 map 中对应信息
this.lruIndexMap.remove(node.key());
} else {
//2. FIFO 删除逻辑(O(n) 时间复杂度)
firstQueue.remove(key);
}
}
5.数据的更新

当数据被访问时,提升数据的优先级。

(1)如果在 lruMap 中,则首先移除,然后放入到头部

(2)如果不在 lruMap 中,但是在 FIFO 队列,则从 FIFO 队列中移除,添加到 LRU map 中。

(3)如果都不在,直接加入到 FIFO 队列中即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 放入元素
* 1. 如果 lruIndexMap 已经存在,则处理 lru 队列,先删除,再插入。
* 2. 如果 firstQueue 中已经存在,则处理 first 队列,先删除 firstQueue,然后插入 Lru。
* 1 和 2 是不同的场景,但是代码实际上是一样的,删除逻辑中做了二种场景的兼容。
*
* 3. 如果不在1、2中,说明是新元素,直接插入到 firstQueue 的开始即可。
*
* @param key 元素
*/
@Override
public void updateKey(final K key) {
//1.1 是否在 LRU MAP 中
//1.2 是否在 firstQueue 中
DoubleListNode<K,V> node = lruIndexMap.get(key);
if(ObjectUtil.isNotNull(node)
|| firstQueue.contains(key)) {
//1.3 删除信息
this.removeKey(key);
//1.4 加入到 LRU 中
this.addToLruMapHead(key);
return;
}
//2. 直接加入到 firstQueue 队尾
// if(firstQueue.size() >= LIMIT_QUEUE_SIZE) {
// // 避免第一次访问的列表一直增长,移除队头的元素
// firstQueue.remove();
// }
firstQueue.add(key);
}

这里我想到了一个优化点,限制 firstQueue 的一直增长,因为遍历的时间复杂度为 O(n),所以限制最大的大小为 1024。

如果超过了,则把 FIFO 中的元素先移除掉。

不过只移除 FIFO,不移除 cache,会导致二者的活跃程度不一致;

如果同时移除,但是 cache 的大小还没有满足,可能会导致超出用户的预期,这个可以作为一个优化点,暂时注释掉。

6.测试

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(3)
.evict(CacheEvicts.<String, String>lru2Q())
.build();

cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");

// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

效果

1
2
[DEBUG] [2020-10-03 13:15:50.670] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[D, A, C]

LRU-2算法实现

1.实现LRU简介

FIFO 中的缺点还是比较明显的,需要 O(n) 的时间复杂度做遍历。

而且命中率和 LRU-2 比起来还是会差一点。

这里 LRU map 出现了多次,我们为了方便,将 LRU map 简单的封装为一个数据结构。

我们使用双向链表+HashMap 实现一个简单版本的。

2.节点

node 节点和以前一致:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DoubleListNode<K,V> {

/**
* 键
*/
private K key;

/**
* 值
*/
private V value;

/**
* 前一个节点
*/
private DoubleListNode<K,V> pre;

/**
* 后一个节点
*/
private DoubleListNode<K,V> next;

//fluent getter & setter
}
3.接口

我们根据自己的需要,暂时定义 3 个最重要的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* LRU map 接口
*/
public interface ILruMap<K,V> {

/**
* 移除最老的元素
* @return 移除的明细
*/
ICacheEntry<K, V> removeEldest();

/**
* 更新 key 的信息
* @param key key
*/
void updateKey(final K key);

/**
* 移除对应的 key 信息
* @param key key
*/
void removeKey(final K key);

/**
* 是否为空
* @return 是否
*/
boolean isEmpty();

/**
* 是否包含元素
* @param key 元素
* @return 结果
*/
boolean contains(final K key);
}
4.实现

我们基于 DoubleLinkedList + HashMap 实现。

就是把上一节中的实现整理一下即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
* 基于双向列表的实现
*/
public class LruMapDoubleList<K,V> implements ILruMap<K,V> {

private static final Log log = LogFactory.getLog(LruMapDoubleList.class);

/**
* 头结点
*/
private DoubleListNode<K,V> head;

/**
* 尾巴节点
*/
private DoubleListNode<K,V> tail;

/**
* map 信息
*
* key: 元素信息
* value: 元素在 list 中对应的节点信息
*/
private Map<K, DoubleListNode<K,V>> indexMap;

public LruMapDoubleList() {
this.indexMap = new HashMap<>();
this.head = new DoubleListNode<>();
this.tail = new DoubleListNode<>();

this.head.next(this.tail);
this.tail.pre(this.head);
}

@Override
public ICacheEntry<K, V> removeEldest() {
// 获取尾巴节点的前一个元素
DoubleListNode<K,V> tailPre = this.tail.pre();
if(tailPre == this.head) {
log.error("当前列表为空,无法进行删除");
throw new CacheRuntimeException("不可删除头结点!");
}

K evictKey = tailPre.key();
V evictValue = tailPre.value();

return CacheEntry.of(evictKey, evictValue);
}

/**
* 放入元素
*
* (1)删除已经存在的
* (2)新元素放到元素头部
*
* @param key 元素
*/
@Override
public void updateKey(final K key) {
//1. 执行删除
this.removeKey(key);

//2. 新元素插入到头部
//head<->next
//变成:head<->new<->next
DoubleListNode<K,V> newNode = new DoubleListNode<>();
newNode.key(key);

DoubleListNode<K,V> next = this.head.next();
this.head.next(newNode);
newNode.pre(this.head);
next.pre(newNode);
newNode.next(next);

//2.2 插入到 map 中
indexMap.put(key, newNode);
}

/**
* 移除元素
*
* 1. 获取 map 中的元素
* 2. 不存在直接返回,存在执行以下步骤:
* 2.1 删除双向链表中的元素
* 2.2 删除 map 中的元素
*
* @param key 元素
*/
@Override
public void removeKey(final K key) {
DoubleListNode<K,V> node = indexMap.get(key);

if(ObjectUtil.isNull(node)) {
return;
}

// 删除 list node
// A<->B<->C
// 删除 B,需要变成:A<->C
DoubleListNode<K,V> pre = node.pre();
DoubleListNode<K,V> next = node.next();

pre.next(next);
next.pre(pre);

// 删除 map 中对应信息
this.indexMap.remove(key);
}

@Override
public boolean isEmpty() {
return indexMap.isEmpty();
}

@Override
public boolean contains(K key) {
return indexMap.containsKey(key);
}
}
5.基本属性

LRU 的实现保持不变。我们直接将 FIFO 替换为 LRU map 即可。

为了便于理解,我们将 FIFO 对应为 firstLruMap,用来存放用户只访问了一次的元素。

将原来的 LRU 中存入访问了 2 次及其以上的元素。

其他逻辑和 2Q 保持一致。

定义两个 LRU,用来分别存储访问的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CacheEvictLru2<K,V> extends AbstractCacheEvict<K,V> {

private static final Log log = LogFactory.getLog(CacheEvictLru2.class);

/**
* 第一次访问的 lru
*/
private final ILruMap<K,V> firstLruMap;

/**
* 2次及其以上的 lru
*/
private final ILruMap<K,V> moreLruMap;

public CacheEvictLru2() {
this.firstLruMap = new LruMapDoubleList<>();
this.moreLruMap = new LruMapDoubleList<>();
}

}
6.淘汰实现

和 lru 2Q 模式类似,这里我们优先淘汰 firstLruMap 中的数据信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
ICacheEntry<K, V> result = null;
final ICache<K,V> cache = context.cache();
// 超过限制,移除队尾的元素
if(cache.size() >= context.size()) {
ICacheEntry<K,V> evictEntry = null;
//1. firstLruMap 不为空,优先移除队列中元素
if(!firstLruMap.isEmpty()) {
evictEntry = firstLruMap.removeEldest();
log.debug("从 firstLruMap 中淘汰数据:{}", evictEntry);
} else {
//2. 否则从 moreLruMap 中淘汰数据
evictEntry = moreLruMap.removeEldest();
log.debug("从 moreLruMap 中淘汰数据:{}", evictEntry);
}
// 执行缓存移除操作
final K evictKey = evictEntry.key();
V evictValue = cache.remove(evictKey);
result = new CacheEntry<>(evictKey, evictValue);
}
return result;
}
7.删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 移除元素
*
* 1. 多次 lru 中存在,删除
* 2. 初次 lru 中存在,删除
*
* @param key 元素
*/
@Override
public void removeKey(final K key) {
//1. 多次LRU 删除逻辑
if(moreLruMap.contains(key)) {
moreLruMap.removeKey(key);
log.debug("key: {} 从 moreLruMap 中移除", key);
} else {
firstLruMap.removeKey(key);
log.debug("key: {} 从 firstLruMap 中移除", key);
}
}
8.更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 更新信息
* 1. 如果 moreLruMap 已经存在,则处理 more 队列,先删除,再插入。
* 2. 如果 firstLruMap 中已经存在,则处理 first 队列,先删除 firstLruMap,然后插入 Lru。
* 1 和 2 是不同的场景,但是代码实际上是一样的,删除逻辑中做了二种场景的兼容。
*
* 3. 如果不在1、2中,说明是新元素,直接插入到 firstLruMap 的开始即可。
*
* @param key 元素
*/
@Override
public void updateKey(final K key) {
//1. 元素已经在多次访问,或者第一次访问的 lru 中
if(moreLruMap.contains(key)
|| firstLruMap.contains(key)) {
//1.1 删除信息
this.removeKey(key);
//1.2 加入到多次 LRU 中
moreLruMap.updateKey(key);
log.debug("key: {} 多次访问,加入到 moreLruMap 中", key);
} else {
// 2. 加入到第一次访问 LRU 中
firstLruMap.updateKey(key);
log.debug("key: {} 为第一次访问,加入到 firstLruMap 中", key);
}
}

实际上使用 LRU-2 的代码逻辑反而变得清晰了一些,主要是因为我们把 lruMap 作为独立的数据结构抽离了出去

9.测试

代码

1
2
3
4
5
6
7
8
9
10
11
12
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(3)
.evict(CacheEvicts.<String, String>lru2Q())
.build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");
// 访问一次A
cache.get("A");
cache.put("D", "LRU");
Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

日志

为了便于定位分析,源代码实现的时候,加了一点日志。

1
2
3
4
5
6
7
8
9
[DEBUG] [2020-10-03 14:39:04.966] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 为第一次访问,加入到 firstLruMap 中
[DEBUG] [2020-10-03 14:39:04.967] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: B 为第一次访问,加入到 firstLruMap 中
[DEBUG] [2020-10-03 14:39:04.968] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: C 为第一次访问,加入到 firstLruMap 中
[DEBUG] [2020-10-03 14:39:04.970] [main] [c.g.h.c.c.s.e.CacheEvictLru2.removeKey] - key: A 从 firstLruMap 中移除
[DEBUG] [2020-10-03 14:39:04.970] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 多次访问,加入到 moreLruMap 中
[DEBUG] [2020-10-03 14:39:04.972] [main] [c.g.h.c.c.s.e.CacheEvictLru2.doEvict] - 从 firstLruMap 中淘汰数据:EvictEntry{key=B, value=null}
[DEBUG] [2020-10-03 14:39:04.974] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[DEBUG] [2020-10-03 14:39:04.974] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: D 为第一次访问,加入到 firstLruMap 中
[D, A, C]
10.小结

对于 LRU 算法的改进我们主要做了两点:

(1)性能的改进,从 O(N) 优化到 O(1)

(2)批量操作的改进,避免缓存污染

四.过期功能的实现

缓存接口定义

我们首先来定义一下接口。

主要有两个:一个是多久之后过期,一个是在什么时候过期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ICache<K, V> extends Map<K, V> {

/**
* 设置过期时间
*
* @param key key
* @param timeInMills 毫秒时间之后过期
* @return this
*/
ICache<K, V> expire(final K key, final long timeInMills);

/**
* 在指定的时间过期
* @param key key
* @param timeInMills 时间戳
* @return this
*/
ICache<K, V> expireAt(final K key, final long timeInMills);

}

缓存接口实现

为了便于处理,我们将多久之后过期,进行计算。将两个问题变成同一个问题,在什么时候过期的问题。

核心的代码,主要还是看 cacheExpire 接口。

1
2
3
4
5
6
7
8
9
10
11
@Override
public ICache<K, V> expire(K key, long timeInMills) {
long expireTime = System.currentTimeMillis() + timeInMills;
return this.expireAt(key, expireTime);
}

@Override
public ICache<K, V> expireAt(K key, long timeInMills) {
this.cacheExpire.expire(key, timeInMills);
return this;
}

缓存过期接口

这里为了便于后期拓展,对于过期的处理定义为接口,便于后期灵活替换

其中 expire(final K key, final long expireAt); 就是我们方法中调用的地方。

refershExpire 属于惰性删除,需要进行刷新时才考虑,我们后面讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface ICacheExpire<K,V> {

/**
* 指定过期信息
* @param key key
* @param expireAt 什么时候过期
*/
void expire(final K key, final long expireAt);

/**
* 惰性删除中需要处理的 keys
* @param keyList keys
*/
void refreshExpire(final Collection<K> keyList);

}

expire 实现原理

其实过期的实思路也比较简单:我们可以开启一个定时任务,比如 1 秒钟做一次轮训,将过期的信息清空。

1.过期信息的存储
1
2
3
4
5
6
7
8
9
10
11
/**
* 过期 map
*
* 空间换时间
*/
private final Map<K, Long> expireMap = new HashMap<>();

@Override
public void expire(K key, long expireAt) {
expireMap.put(key, expireAt);
}

我们定义一个 map,key 是对应的要过期的信息,value 存储的是过期时间。

2.轮询清理

我们固定 100ms 清理一次,每次最多清理 100 个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 单次清空的数量限制
*/
private static final int LIMIT = 100;

/**
* 缓存实现
*/
private final ICache<K,V> cache;
/**
* 线程执行类
*/
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
public CacheExpire(ICache<K, V> cache) {
this.cache = cache;
this.init();
}
/**
* 初始化任务
*/
private void init() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 100, 100, TimeUnit.MILLISECONDS);
}

这里定义了一个单线程,用于执行清空任务。

3.清空任务

这个非常简单,遍历过期数据,判断对应的时间,如果已经到期了,则执行清空操作。

为了避免单次执行时间过长,最多只处理 100 条

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 定时执行任务
*/
private class ExpireThread implements Runnable {
@Override
public void run() {
//1.判断是否为空
if(MapUtil.isEmpty(expireMap)) {
return;
}
//2. 获取 key 进行处理
int count = 0;
for(Map.Entry<K, Long> entry : expireMap.entrySet()) {
if(count >= LIMIT) {
return;
}
expireKey(entry);
count++;
}
}
}

/**
* 执行过期操作
* @param entry 明细
*/
private void expireKey(Map.Entry<K, Long> entry) {
final K key = entry.getKey();
final Long expireAt = entry.getValue();
// 删除的逻辑处理
long currentTime = System.currentTimeMillis();
if(currentTime >= expireAt) {
expireMap.remove(key);
// 再移除缓存,后续可以通过惰性删除做补偿
cache.remove(key);
}
}

惰性删除

1.出现的原因

类似于 redis,我们采用定时删除的方案,就有一个问题:可能数据清理的不及时。

那当我们查询时,可能获取到到是脏数据。

于是就有一些人就想了,当我们关心某些数据时,才对数据做对应的删除判断操作,这样压力会小很多。

算是一种折中方案。

2.需要惰性删除的方法

一般就是各种查询方法,比如我们获取 key 对应的值时

1
2
3
4
5
6
7
8
@Override
@SuppressWarnings("unchecked")
public V get(Object key) {
//1. 刷新所有过期信息
K genericKey = (K) key;
this.cacheExpire.refreshExpire(Collections.singletonList(genericKey));
return map.get(key);
}

我们在获取之前,先做一次数据的刷新。

3.刷新的实现

实现原理也非常简单,就是一个循环,然后作删除即可。

这里加了一个小的优化:选择数量少的作为外循环。

循环集合的时间复杂度是 O(n), map.get() 的时间复杂度是 O(1);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void refreshExpire(Collection<K> keyList) {
if(CollectionUtil.isEmpty(keyList)) {
return;
}
// 判断大小,小的作为外循环。一般都是过期的 keys 比较小。
if(keyList.size() <= expireMap.size()) {
for(K key : keyList) {
expireKey(key);
}
} else {
for(Map.Entry<K, Long> entry : expireMap.entrySet()) {
this.expireKey(entry);
}
}
}

测试

上面的代码写完之后,我们就可以验证一下了。

1
2
3
4
5
6
7
8
9
10
11
12
13
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(3)
.build();
cache.put("1", "1");
cache.put("2", "2");

cache.expire("1", 10);
Assert.assertEquals(2, cache.size());

TimeUnit.MILLISECONDS.sleep(50);
Assert.assertEquals(1, cache.size());

System.out.println(cache.keySet());

结果也符合我们的预期

五.过期功能的优化

上面的过期功能的实现中存在两个问题:

(1)keys 的选择不够随机,可能会导致每次循环 100 个结束时,真正需要过期的没有被遍历到

(2)keys 的遍历可能大部分都是无效的

所以下面以过期时间为维度对过期功能进行优化

基于时间的遍历

1.思路

我们换一种思路,让过期的时间做 key,相同时间的需要过期的信息放在一个列表中,作为 value

然后对过期时间排序,轮询的时候就可以快速判断出是否有过期的信息了。

我们每次 put 放入过期元素时,根据过期时间对元素进行排序,相同的过期时间的 Keys 放在一起。

优点:定时遍历的时候,如果时间不到当前时间,就可以直接返回了,大大降低无效遍历。

缺点:考虑到惰性删除问题,还是需要存储以删除信息作为 key 的 map 关系,这样内存基本翻倍。

2.基本属性定义

我们这里使用 TreeMap 帮助我们进行过期时间的排序,这个集合后续有时间可以详细讲解了,我大概看了下 jdk1.8 的源码,主要是通过红黑树实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CacheExpireSort<K,V> implements ICacheExpire<K,V> {

/**
* 单次清空的数量限制
*/
private static final int LIMIT = 100;

/**
* 排序缓存存储
*
* 使用按照时间排序的缓存处理。
*/
private final Map<Long, List<K>> sortMap = new TreeMap<>(new Comparator<Long>() {
@Override
public int compare(Long o1, Long o2) {
return (int) (o1 - o2);
}
});

/**
* 过期 map
*
* 空间换时间
*/
private final Map<K, Long> expireMap = new HashMap<>();

/**
* 缓存实现
*/
private final ICache<K,V> cache;

}
3.放入元素时

每次存入新元素时,同时放入 sortMap 和 expireMap。

1
2
3
4
5
6
7
8
9
10
11
@Override
public void expire(K key, long expireAt) {
List<K> keys = sortMap.get(expireAt);
if(keys == null) {
keys = new ArrayList<>();
}
keys.add(key);
// 设置对应的信息
sortMap.put(expireAt, keys);
expireMap.put(key, expireAt);
}

定时任务的执行

1.定义

我们定义一个定时任务,100ms 执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 线程执行类
*/
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

public CacheExpireSort(ICache<K, V> cache) {
this.cache = cache;
this.init();
}
/**
* 初始化任务
*/
private void init() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 100, 100, TimeUnit.MILLISECONDS);
}
2.执行任务

实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 定时执行任务
*/
private class ExpireThread implements Runnable {
@Override
public void run() {
//1.判断是否为空
if(MapUtil.isEmpty(sortMap)) {
return;
}
//2. 获取 key 进行处理
int count = 0;
for(Map.Entry<Long, List<K>> entry : sortMap.entrySet()) {
final Long expireAt = entry.getKey();
List<K> expireKeys = entry.getValue();
// 判断队列是否为空
if(CollectionUtil.isEmpty(expireKeys)) {
sortMap.remove(expireAt);
continue;
}
if(count >= LIMIT) {
return;
}
// 删除的逻辑处理
long currentTime = System.currentTimeMillis();
if(currentTime >= expireAt) {
Iterator<K> iterator = expireKeys.iterator();
while (iterator.hasNext()) {
K key = iterator.next();
// 先移除本身
iterator.remove();
expireMap.remove(key);
// 再移除缓存,后续可以通过惰性删除做补偿
cache.remove(key);
count++;
}
} else {
// 直接跳过,没有过期的信息
return;
}
}
}
}

这里直接遍历 sortMap,对应的 key 就是过期时间,然后和当前时间对比即可。

删除的时候,需要删除 expireMap/sortMap/cache。

惰性删除刷新

惰性删除刷新时,就会用到 expireMap。

因为有时候刷新的 key 就一个,如果没有 expireMap 映射关系,可能要把 sortMap 全部遍历一遍才能找到对应的过期时间。

就是一个时间复杂度与空间复杂度衡量的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public void refreshExpire(Collection<K> keyList) {
if(CollectionUtil.isEmpty(keyList)) {
return;
}
// 遍历判断过期信息
// 这样维护两套的代价太大,后续优化,暂时不用。
// 判断大小,小的作为外循环
final int expireSize = expireMap.size();
if(expireSize <= keyList.size()) {
// 一般过期的数量都是较少的
for(Map.Entry<K,Long> entry : expireMap.entrySet()) {
K key = entry.getKey();
// 这里直接执行过期处理,不再判断是否存在于集合中。
// 因为基于集合的判断,时间复杂度为 O(n)
this.removeExpireKey(key);
}
} else {
for(K key : keyList) {
this.removeExpireKey(key);
}
}
}

/**
* 移除过期信息
* @param key key
*/
private void removeExpireKey(final K key) {
Long expireTime = expireMap.get(key);
if(expireTime != null) {
final long currentTime = System.currentTimeMillis();
if(currentTime >= expireTime) {
expireMap.remove(key);
List<K> expireKeys = sortMap.get(expireTime);
expireKeys.remove(key);
sortMap.put(expireTime, expireKeys);
}
}
}

六.RBD持久化功能的实现

缓存的持久化功能分为以下两个部分:

  • Cache 的内容持久化到文件或者数据库
  • 初始化的时候加载持久化数据

持久化操作

1.持久化操作接口

为了便于灵活替换,我们定义一个持久化的接口。

1
2
3
4
5
6
7
8
9
public interface ICachePersist<K, V> {

/**
* 持久化缓存信息
* @param cache 缓存
*/
void persist(final ICache<K, V> cache);

}
2.持久化操作接口实现

我们实现一个最简单的基于 json 的持久化,当然后期可以添加类似于 AOF 的持久化模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class CachePersistDbJson<K,V> implements ICachePersist<K,V> {

/**
* 数据库路径
*/
private final String dbPath;

public CachePersistDbJson(String dbPath) {
this.dbPath = dbPath;
}

/**
* 持久化
* key长度 key+value
* 第一个空格,获取 key 的长度,然后截取
* @param cache 缓存
*/
@Override
public void persist(ICache<K, V> cache) {
Set<Map.Entry<K,V>> entrySet = cache.entrySet();

// 创建文件
FileUtil.createFile(dbPath);
// 清空文件
FileUtil.truncate(dbPath);

for(Map.Entry<K,V> entry : entrySet) {
K key = entry.getKey();
Long expireTime = cache.expire().expireTime(key);
// 填充持久化数据
PersistEntry<K,V> persistEntry = new PersistEntry<>();
persistEntry.setKey(key);
persistEntry.setValue(entry.getValue());
persistEntry.setExpire(expireTime);

// 持久化数据转换为JSON数据
String line = JSON.toJSONString(persistEntry);
// JSON数据追加到文件中
FileUtil.write(dbPath, line, StandardOpenOption.APPEND);
}
}

}
3.持久化操作的触发

上面定义好了一种持久化的策略,但是没有提供对应的触发方式。

我们就采用对用户透明的设计方式:定时执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class InnerCachePersist<K,V> {

private static final Log log = LogFactory.getLog(InnerCachePersist.class);

/**
* 缓存信息
*/
private final ICache<K,V> cache;

/**
* 缓存持久化策略
*/
private final ICachePersist<K,V> persist;

/**
* 线程执行类
*/
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

public InnerCachePersist(ICache<K, V> cache, ICachePersist<K, V> persist) {
this.cache = cache;
this.persist = persist;

// 初始化
this.init();
}

/**
* 初始化
*/
private void init() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("开始持久化缓存信息");
persist.persist(cache);
log.info("完成持久化缓存信息");
} catch (Exception exception) {
log.error("文件持久化异常", exception);
}
}
}, 0, 10, TimeUnit.MINUTES);
}

}

定时执行的时间间隔为 10min。

4.测试

我们只需要在创建 Cache 时,指定我们的持久化策略即可。

1
2
3
4
5
6
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.load(new MyCacheLoad())
.persist(CachePersists.<String, String>dbJson("1.rdb"))
.build();
Assert.assertEquals(2, cache.size());
TimeUnit.SECONDS.sleep(5);

为了确保文件持久化完成,我们沉睡了一会儿。

测试结果如下

生成1.rdb文件内容如下:

1
2
{"key":"2","value":"2"}
{"key":"1","value":"1"}

缓存数据的加载

1.缓存初始化接口

缓存初始化即从持久化中提取数据到缓存内存中

为了便于后期拓展,定义 ICacheLoad 接口。

1
2
3
4
5
6
7
8
9
public interface ICacheLoad<K, V> {

/**
* 加载缓存信息
* @param cache 缓存
*/
void load(final ICache<K,V> cache);

}
2.缓存初始化接口实现

我们只需要实现以下对应的加载即可,解析文件,然后初始化 cache。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 加载策略-文件路径
*/
public class CacheLoadDbJson<K,V> implements ICacheLoad<K,V> {

private static final Log log = LogFactory.getLog(CacheLoadDbJson.class);

/**
* 文件路径
*/
private final String dbPath;

public CacheLoadDbJson(String dbPath) {
this.dbPath = dbPath;
}

@Override
public void load(ICache<K, V> cache) {
List<String> lines = FileUtil.readAllLines(dbPath);
log.info("[load] 开始处理 path: {}", dbPath);
if(CollectionUtil.isEmpty(lines)) {
log.info("[load] path: {} 文件内容为空,直接返回", dbPath);
return;
}

for(String line : lines) {
if(StringUtil.isEmpty(line)) {
continue;
}

// 执行
// 简单的类型还行,复杂的这种反序列化会失败
PersistEntry<K,V> entry = JSON.parseObject(line, PersistEntry.class);

K key = entry.getKey();
V value = entry.getValue();
Long expire = entry.getExpire();

cache.put(key, value);
if(ObjectUtil.isNotNull(expire)) {
cache.expireAt(key, expire);
}
}
}
}

然后在初始化时使用即可。

七.AOF持久化功能的实现

Redis AOF 解析

1.为什么选择 AOF?

AOF 模式的性能特别好

用过 kafka 的同学肯定知道,kafka 也用到了顺序写这个特性。

顺序写添加文件内容,避免了文件 IO 的随机写问题,性能基本可以和内存媲美。

AOF 的实时性更好,这个是相对于 RDB 模式而言的:

我们原来使用 RDB 模式,将缓存内容全部持久化,这个是比较耗时的动作,一般是几分钟持久化一次。

AOF 模式主要是针对修改内容的指令,然后将所有的指令顺序添加到文件中。这样的话,实时性会好很多,可以提升到秒级别,甚至秒级别。可以将AOF模式理解为一个操作流水表

2.AOF 的吞吐量

AOF 模式可以每次操作都进行持久化,但是这样会导致吞吐量大大下降。

提升吞吐量最常用的方式就是批量,这个 kafka 中也是类似的,比如我们可以 1s 持久化一次,将 1s 内的操作全部放入 buffer 中。

这里其实就是一个 trade-off 问题,实时性与吞吐量的平衡艺术。

实际业务中,1s 的误差一般都是可以接受的,所以这个也是业界比较认可的方式。

3.AOF 的异步+多线程

kafka 中所有的操作实际上都是异步+回调的方式实现的。

异步+多线程,确实可以提升操作的性能。

当然 redis 6 以前,其实一直是单线程的。那为什么性能依然这么好呢?

其实多线程也有代价,那就是线程上下文的切换是需要耗时的,保持并发的安全问题,也需要加锁,从而降低性能。

所以这里要考虑异步的收益,与付出的耗时是否成正比的问题。

4.AOF 的落盘

我们 AOF 与 RDB 模式,归根结底都是基于操作系统的文件系统做持久化的。

对于开发者而言,可能就是调用一个 api 就实现了,但是实际持久化落盘的动作并不见得就是一步完成的。

文件系统为了提升吞吐量,也会采用类似 buffer 的方式。这忽然有一点俄罗斯套娃的味道。

但是优秀的设计总是相似的,比如说缓存从 cpu 的设计中就有 L1/L2 等等,思路是一致的。

阿里的很多开源技术,都会针对操作系统的落盘做进一步的优化,这个我们后续做深入学习

5.AOF 的缺陷

大道缺一,没有银弹。

AOF 千好万好,和 RDB 对比也存在一个缺陷,那就是指令

AOF注解实现

1.接口

接口和 rdb 的保持一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 持久化缓存接口
* @param <K> key
* @param <V> value
*/
public interface ICachePersist<K, V> {

/**
* 持久化缓存信息
* @param cache 缓存
*/
void persist(final ICache<K, V> cache);

}
2.注解定义

为了和耗时统计,刷新等特性保持一致,对于操作类的动作才添加到文件中(append to file)我们也基于注解属性来指定,而不是固定写死在代码中,便于后期拓展调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 缓存拦截器
*/
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheInterceptor {

/**
* 操作是否需要 append to file,默认为 false
* 主要针对 cache 内容有变更的操作,不包括查询操作。
* 包括删除,添加,过期等操作。
* @return 是否
*/
boolean aof() default false;

}

我们在原来的 @CacheInterceptor 注解中添加 aof 属性,用于指定是否对操作开启 aof 模式

3.过期操作中启用AOF

类似于 spring 的事务拦截器,我们使用代理类调用 expireAt。

expire 方法就不需要添加 aof 拦截了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 设置过期时间
* @param key key
* @param timeInMills 毫秒时间之后过期
* @return this
*/
@Override
@CacheInterceptor
public ICache<K, V> expire(K key, long timeInMills) {
long expireTime = System.currentTimeMillis() + timeInMills;
// 使用代理调用
Cache<K,V> cachePoxy = (Cache<K, V>) CacheProxy.getProxy(this);
return cachePoxy.expireAt(key, expireTime);
}

/**
* 指定过期信息
* @param key key
* @param timeInMills 时间戳
* @return this
*/
@Override
@CacheInterceptor(aof = true)
public ICache<K, V> expireAt(K key, long timeInMills) {
this.expire.expire(key, timeInMills);
return this;
}
4.变更操作中启用AOF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
@CacheInterceptor(aof = true)
public V put(K key, V value) {
//1.1 尝试驱除
CacheEvictContext<K,V> context = new CacheEvictContext<>();
context.key(key).size(sizeLimit).cache(this);
boolean evictResult = evict.evict(context);
if(evictResult) {
// 执行淘汰监听器
ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code());
for(ICacheRemoveListener<K,V> listener : this.removeListeners) {
listener.listen(removeListenerContext);
}
}
//2. 判断驱除后的信息
if(isSizeLimit()) {
throw new CacheRuntimeException("当前队列已满,数据添加失败!");
}
//3. 执行添加
return map.put(key, value);
}

@Override
@CacheInterceptor(aof = true)
public V remove(Object key) {
return map.remove(key);
}

@Override
@CacheInterceptor(aof = true)
public void putAll(Map<? extends K, ? extends V> m) {
map.putAll(m);
}

@Override
@CacheInterceptor(refresh = true, aof = true)
public void clear() {
map.clear();
}

AOF 持久化拦截实现

1.持久化对象定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* AOF 持久化明细
*/
public class PersistAofEntry {

/**
* 参数信息
*/
private Object[] params;

/**
* 方法名称
*/
private String methodName;(

//getter & setter &toString
}

这里我们只需要方法名,和参数对象。

暂时实现的简单一些即可

2.持久化拦截器

我们定义拦截器,当 cache 中定义的持久化类为 CachePersistAof 时,将操作的信息放入到 CachePersistAof 的 buffer 列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CacheInterceptorAof<K,V> implements ICacheInterceptor<K, V> {

private static final Log log = LogFactory.getLog(CacheInterceptorAof.class);

@Override
public void before(ICacheInterceptorContext<K,V> context) {
}

@Override
public void after(ICacheInterceptorContext<K,V> context) {
// 持久化类
ICache<K,V> cache = context.cache();
ICachePersist<K,V> persist = cache.persist();

if(persist instanceof CachePersistAof) {
CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist;

String methodName = context.method().getName();
PersistAofEntry aofEntry = PersistAofEntry.newInstance();
aofEntry.setMethodName(methodName);
aofEntry.setParams(context.params());

String json = JSON.toJSONString(aofEntry);

// 直接持久化
log.debug("AOF 开始追加文件内容:{}", json);
cachePersistAof.append(json);
log.debug("AOF 完成追加文件内容:{}", json);
}
}

}
3.拦截器调用

当 AOF 的注解属性为 true 时,调用上述拦截器即可。

这里为了避免浪费,只有当持久化类为 AOF 模式时,才进行调用。

1
2
3
4
5
6
7
8
9
//3. AOF 追加
final ICachePersist cachePersist = cache.persist();
if(cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) {
if(before) {
persistInterceptors.before(interceptorContext);
} else {
persistInterceptors.after(interceptorContext);
}
}

AOF持久化实现

这里的 AOF 模式和以前的 RDB 持久化类只是不同的模式,实际上二者是相同的接口。

1.接口

这里我们统一定义了不同的持久化类的时间,便于 RDB 与 AOF 不同任务的不同时间间隔触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface ICachePersist<K, V> {

/**
* 持久化缓存信息
* @param cache 缓存
*/
void persist(final ICache<K, V> cache);

/**
* 延迟时间
* @return 延迟
*/
long delay();

/**
* 时间间隔
* @return 间隔
*/
long period();

/**
* 时间单位
* @return 时间单位
*/
TimeUnit timeUnit();
}
2.持久化类实现

实现一个 Buffer 列表,用于每次拦截器直接顺序添加

持久化的实现也比较简单,追加到文件之后,直接清空 buffer 列表即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 缓存持久化-AOF 持久化模式
*/
public class CachePersistAof<K,V> extends CachePersistAdaptor<K,V> {

private static final Log log = LogFactory.getLog(CachePersistAof.class);

/**
* 缓存列表
*/
private final List<String> bufferList = new ArrayList<>();

/**
* 数据持久化路径
*/
private final String dbPath;

public CachePersistAof(String dbPath) {
this.dbPath = dbPath;
}

/**
* 持久化
* key长度 key+value
* 第一个空格,获取 key 的长度,然后截取
* @param cache 缓存
*/
@Override
public void persist(ICache<K, V> cache) {
log.info("开始 AOF 持久化到文件");
// 1. 创建文件
if(!FileUtil.exists(dbPath)) {
FileUtil.createFile(dbPath);
}
// 2. 持久化追加到文件中
FileUtil.append(dbPath, bufferList);

// 3. 清空 buffer 列表
bufferList.clear();
log.info("完成 AOF 持久化到文件");
}

@Override
public long delay() {
return 1;
}

@Override
public long period() {
return 1;
}

@Override
public TimeUnit timeUnit() {
return TimeUnit.SECONDS;
}

/**
* 添加文件内容到 buffer 列表中
* @param json json 信息
*/
public void append(final String json) {
if(StringUtil.isNotEmpty(json)) {
bufferList.add(json);
}
}

}

持久化测试

1.测试代码
1
2
3
4
5
6
7
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.persist(CachePersists.<String, String>aof("1.aof"))
.build();
cache.put("1", "1");
cache.expire("1", 10);
cache.remove("2");
TimeUnit.SECONDS.sleep(1);
2.测试日志

expire 实际上调用的是 expireAt。

1
2
3
4
5
6
7
8
9
10
11
[DEBUG] [2020-10-02 12:20:41.979] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName":"put","params":["1","1"]}
[DEBUG] [2020-10-02 12:20:41.980] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName":"put","params":["1","1"]}
[DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName":"expireAt","params":["1",1601612441990]}
[DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName":"expireAt","params":["1",1601612441990]}
[DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName":"remove","params":["2"]}
[DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName":"remove","params":["2"]}
[DEBUG] [2020-10-02 12:20:42.088] [pool-1-thread-1] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: expire
[INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 开始持久化缓存信息
[INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 开始 AOF 持久化到文件
[INFO] [2020-10-02 12:20:42.798] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 完成 AOF 持久化到文件
[INFO] [2020-10-02 12:20:42.799] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 完成持久化缓存信息
3.文件内容

1.aof 的文件内容如下

1
2
3
{"methodName":"put","params":["1","1"]}
{"methodName":"expireAt","params":["1",1601612441990]}
{"methodName":"remove","params":["2"]}

将每一次的操作,简单的存储到文件中

AOF 加载实现

1.加载

类似于 RDB 的加载模式,aof 的加载模式也是类似的。

我们需要根据文件的内容,还原以前的缓存的内容。

实现思路:遍历文件内容,反射调用原来的方法。

2.代码实现

解析文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void load(ICache<K, V> cache) {
List<String> lines = FileUtil.readAllLines(dbPath);
log.info("[load] 开始处理 path: {}", dbPath);
if(CollectionUtil.isEmpty(lines)) {
log.info("[load] path: {} 文件内容为空,直接返回", dbPath);
return;
}

for(String line : lines) {
if(StringUtil.isEmpty(line)) {
continue;
}
// 执行
// 简单的类型还行,复杂的这种反序列化会失败
PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class);
final String methodName = entry.getMethodName();
final Object[] objects = entry.getParams();
final Method method = METHOD_MAP.get(methodName);
// 反射调用
ReflectMethodUtil.invoke(cache, method, objects);
}
}

方法映射的预加载

Method 反射是固定的,为了提升性能,我们做一下预处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 方法缓存
* 暂时比较简单,直接通过方法判断即可,不必引入参数类型增加复杂度。
*/
private static final Map<String, Method> METHOD_MAP = new HashMap<>();
static {
Method[] methods = Cache.class.getMethods();
for(Method method : methods){
CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class);
if(cacheInterceptor != null) {
// 暂时
if(cacheInterceptor.aof()) {
String methodName = method.getName();
METHOD_MAP.put(methodName, method);
}
}
}
}

持久化加载测试

1.文件内容
  • default.aof
1
{"methodName":"put","params":["1","1"]}
2.测试
1
2
3
4
5
6
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.load(CacheLoads.<String, String>aof("default.aof"))
.build();

Assert.assertEquals(1, cache.size());
System.out.println(cache.keySet());

直接将 default.aof 文件加载到 cache 缓存中

八.监听器的开发

下面我们将一起学习一下如何实现类似 guava-cache 中的 removeListener 删除监听器,和类似 redis 中的慢日志监控的 slowListener

删除监听器:将数据驱除或过期时删除的数据记录打印到删除日志中

慢操作监听器:当操作变慢时,将警告信息或报警信息打印到慢日志中

删除监听器的开发

1.基本介绍

我们在两种场景下删除数据是对用户透明的:

(1)size 满了之后,进行数据淘汰。

(2)expire 过期时,清除数据。

这两个特性对用户本来应该是无感的,不过用户如果关心的话,也可以通过添加删除监听器来获取到相关的变更信息

2.实现思路

为了实现删除的监听,我们需要找到删除的位置,然后调用监听器即可

  • evict 驱除的场景

​ 每次 put 数据时,都会校验 size 是否达到最大的限制,如果达到,则进行 evict 淘汰

  • expire 过期的场景

​ 用户指定 expire 时间之后,回后台异步执行刷新

​ 也存在惰性删除的场景

3.接口定义

为了统一,我们将所有的删除都定义统一的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 删除监听器接口
* @param <K> key
* @param <V> value
*/
public interface ICacheRemoveListener<K,V> {

/**
* 监听
* @param context 上下文
*/
void listen(final ICacheRemoveListenerContext<K,V> context);

}
4.内置实现

系统内置的实现如下:

1
2
3
4
5
6
7
8
9
10
11
public class CacheRemoveListener<K,V> implements ICacheRemoveListener<K,V> {

private static final Log log = LogFactory.getLog(CacheRemoveListener.class);

@Override
public void listen(ICacheRemoveListenerContext<K, V> context) {
log.debug("Remove key: {}, value: {}, type: {}",
context.key(), context.value(), context.type());
}

}

这个监听器是默认开启的,暂时无法关闭

5.自定义

用户可以自己的需要,进行自定义实现:

1
2
3
4
5
6
7
8
public class MyRemoveListener<K,V> implements ICacheRemoveListener<K,V> {

@Override
public void listen(ICacheRemoveListenerContext<K, V> context) {
System.out.println("【删除提示】可恶,我竟然被删除了!" + context.key());
}

}
6.使用测试
1
2
3
4
5
6
7
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.size(1)
.addRemoveListener(new MyRemoveListener<String, String>())
.build();

cache.put("1", "1");
cache.put("2", "2");

我们指定 cache 的大小为1,设置我们自定义的删除监听器

这里的删除监听器可以添加多个

7.测试结果

测试日志如下:

1
2
[DEBUG] [2020-09-30 19:32:54.617] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 2, value: 2, type: evict
【删除提示】可恶,我竟然被删除了!2

慢操作监听器开发

1.说明

redis 中会存储慢操作的相关日志信息,主要是由两个参数构成:

(1)slowlog-log-slower-than 预设阈值,它的单位是毫秒(1秒=1000000微秒)默认值是10000

(2)slowlog-max-len 最多存储多少条的慢日志记录

不过 redis 是直接存储到内存中,而且有长度限制。

根据实际工作体验,如果我们可以添加慢日志的监听,然后有对应的存储或者报警,这样更加方便问题的分析和快速反馈。

所以我们引入类似于删除的监听器。

2.实现思路

我们处理所有的 cache 操作,并且记录对应的操作耗时。

如果耗时操作用户设置的时间阈值,则调用慢操作监听器。

3.接口定义

为了保证接口的灵活性,每一个实现都可以定义自己的慢操作阈值,这样便于分级处理。

比如超过 100ms,用户可以选择输出 warn 日志;超过 1s,可能影响到业务了,可以直接接入报警系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ICacheSlowListener {

/**
* 监听
* @param context 上下文
*/
void listen(final ICacheSlowListenerContext context);

/**
* 慢日志的阈值
* @return 慢日志的阙值
*/
long slowerThanMills();

}
4.自定义监听器

实现接口 ICacheSlowListener

这里每一个监听器都可以指定自己的慢日志阈值,便于分级处理

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MySlowListener implements ICacheSlowListener {

@Override
public void listen(ICacheSlowListenerContext context) {
System.out.println("【慢日志】name: " + context.methodName());
}

@Override
public long slowerThanMills() {
return 0;
}

}
5.使用测试
1
2
3
4
5
6
ICache<String, String> cache = CacheBs.<String,String>newInstance()
.addSlowListener(new MySlowListener())
.build();

cache.put("1", "2");
cache.get("1");
6.测试结果
1
2
3
4
5
6
[DEBUG] [2020-09-30 17:40:11.547] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: put
[DEBUG] [2020-09-30 17:40:11.551] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: put, cost: 10ms
【慢日志】name: put
[DEBUG] [2020-09-30 17:40:11.554] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.before] - Cost start, method: get
[DEBUG] [2020-09-30 17:40:11.554] [main] [c.g.h.c.c.s.i.c.CacheInterceptorCost.after] - Cost end, method: get, cost: 1ms
【慢日志】name: get

实际工作中,我们可以针对慢日志数据存储,便于后期分析。

也可以直接接入报警系统,及时反馈问题

九.Redis 渐进式 Rehash详解

HashMap 简介

1.HashMap 的 rehash

读过 HashMap 源码的同学,应该都知道 map 在扩容的时候,有一个 rehash 的过程

2.HashMap 的扩容简介

这里简单介绍下:

扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素

当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶

Redis 中的扩容设计

HashMap 的扩容需要对集合中大部分的元素进行重新计算,但是对于 redis 这种企业级应用,特别是单线程的应用,如果像传统的 rehash 一样把所有元素来一遍的话,估计要十几秒的时间。

十几秒对于常见的金融、电商等相对高并发的业务场景,是无法忍受的。

那么 redis 的 rehash 是如何实现的呢?

实际上 redis 的 rehash 动作并不是一次性、集中式地完成的, 而是分多次、渐进式地完成的

这里补充一点,不单单是扩容,缩容也是一样的道理,二者都需要进行 rehash。

只增不降就是对内存的浪费,浪费就是犯罪,特别是内存还这么贵。

ps: 这种思想和 key 淘汰有异曲同工之妙,一口吃不了一个大胖子,一次搞不定,那就 1024 次,慢慢来总能解决问题

Redis 的渐进式 rehash

这部分直接选自经典入门书籍《Redis 设计与实现》

1.为什么要渐进式处理?

实际上 redis 内部有两个 hashtable,我们称之为 ht[0] 和 ht[1]。传统的 HashMap 中只有一个。

为了避免 rehash 对服务器性能造成影响, 服务器不是一次性将 ht[0] 里面的所有键值对全部 rehash 到 ht[1] , 而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地 rehash 到 ht[1] 。

2.详细步骤

哈希表渐进式 rehash 的详细步骤:

(1)为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表。

(2)在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始。

(3)在 rehash 进行期间, 每次对字典执行添加、删除、查找或者更新操作时, 程序除了执行指定的操作以外, 还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] , 当 rehash 工作完成之后, 程序将 rehashidx 属性的值增1。

(4)随着字典操作的不断执行, 最终在某个时间点上, ht[0] 的所有键值对都会被 rehash 至 ht[1] , 这时程序将 rehashidx 属性的值设为 -1 , 表示 rehash 操作已完成。

渐进式 rehash 的好处在于它采取分而治之的方式, 将 rehash 键值对所需的计算工作均滩到对字典的每个添加、删除、查找和更新操作上, 从而避免了集中式 rehash 而带来的庞大计算量。

3.rehash 间的操作怎么兼容呢?

因为在进行渐进式 rehash 的过程中, 字典会同时使用 ht[0] 和 ht[1] 两个哈希表, 那这期间的操作如何保证正常进行呢?

(1)查询一个信息

这个类似于我们的数据库信息等迁移,先查询一个库,没有的话,再去查询另一个库。

ht[0] 中没找到,我们去 ht[1] 中查询即可。

(2)新数据怎么办?

这个和数据迁移一样的道理。

当我们有新旧的两个系统时,新来的用户等信息直接落在新系统即可,

这一措施保证了 ht[0] 包含的键值对数量会只减不增, 并随着 rehash 操作的执行而最终变成空表。

扩容

1.什么时候判断?

redis 在每次执行 put 操作的时候,就可以检查是否需要扩容。

其实也很好理解,put 插入元素的时候,判断是否需要扩容,然后开始扩容,是直接的一种思路。

留一个思考题:我们可以在其他的时候判断吗?

2.redis 判断是否需要扩容的源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
// 如果正在进行渐进式扩容,则返回OK
if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */
// 如果哈希表ht[0]的大小为0,则初始化字典
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
/*
* 如果哈希表ht[0]中保存的key个数与哈希表大小的比例已经达到1:1,即保存的节点数已经大于哈希表大小
* 且redis服务当前允许执行rehash,或者保存的节点数与哈希表大小的比例超过了安全阈值(默认值为5)
* 则将哈希表大小扩容为原来的两倍
*/
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}

扩容的条件总结下来就是两句话:

(1)服务器目前没有在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1;

(2)服务器目前正在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5;

这里其实体现了作者的一种设计思想:如果负载因子超过5,说明信息已经很多了,管你在不在保存,都要执行扩容,优先保证服务可用性。如果没那么高,那就等持久化完成再做 rehash。

我们自己在实现的时候可以简化一下,比如只考虑情况2。

3.扩容到原来的多少?

知道了什么时候应该开始扩容,但是要扩容到多大也是值得思考的一个问题。

扩容的太小,会导致频繁扩容,浪费性能。

扩容的太大,会导致资源的浪费。

其实这个最好的方案是结合我们实际的业务,不过这部分对用户是透明的。

一般是扩容为原来的两倍。

4.为什么需要扩容?

我们在实现 ArrayList 的时候需要扩容,因为数据放不下了。

我们知道 HashMap 的底层是数组 + 链表(红黑树)的数据结构。

那么会存在放不下的情况吗?

个人理解实际上不会。因为链表可以一直加下去。

那为什么需要扩容呢?

实际上更多的是处于性能的考虑。我们使用 HashMap 就是为了提升性能,如果一直不扩容,可以理解为元素都 hash 到相同的 bucket 上,这时就退化成了一个链表。

这会导致查询等操作性能大大降低。

缩容

1.什么时候判断?

看了前面的扩容,我们比较直观地方式是在用户 remove 元素的时候执行是否需要缩容。

不过 redis 并不完全等同于传统的 HashMap,还有数据的淘汰和过期,这些是对用户透明的。

redis 采用的方式实际上是一个定时任务。

个人理解内存缩容很重要,但是没有那么紧急,我们可以 1min 扫描一次,这样可以节省机器资源。

实际工作中,一般 redis 的内存都是逐步上升的,或者稳定在一个范围内,很少去大批量删除数据。(除非数据搞错了,我就遇到过一次,数据同步错地方了)。

所以数据删除,一般几分钟内给用户一个反馈就行。

知其然,知其所以然。

我们懂得了这个道理也就懂得了为什么有时候删除 redis 的几百万 keys,内存也不是直接降下来的原因。

2.缩容的条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
if (htNeedsResize(server.db[dbid].dict))
dictResize(server.db[dbid].dict);
if (htNeedsResize(server.db[dbid].expires))
dictResize(server.db[dbid].expires);
}

/* Hash table parameters */
#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */
int htNeedsResize(dict *dict) {
long long size, used;

size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}

/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
int minimal;

if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal);
}

和扩容类似,不过这里的缩容比例不是 5 倍,而是当哈希表保存的key数量与哈希表的大小的比例小于 10% 时需要缩容。

3.缩容到多少?

最简单的方式是直接变为原来的一半,不过这么做有时候也不是那么好用。

redis 是缩容后的大小为第一个大于等于当前key数量的2的n次方。

这个可能不太好理解,举几个数字就懂了:

keys数量 缩容大小
3 4
4 4
5 8
9 16

主要保障以下3点:

(1)缩容之后,要大于等于 key 的数量

(2)尽可能的小,节约内存

(3)2 的倍数。

第三个看过 HashMap 源码讲解的小伙伴应该深有体会。

当然也不能太小,redis 限制的最小为 4。

实际上如果 redis 中只放 4 个 key,实在是杀鸡用牛刀,一般不会这么小。

我们在实现的时候,直接参考 jdk 好了,给个最小值限制 8。

4.为什么需要缩容?

最核心的目的就是为了节约内存,其实还有一个原因,叫 small means fast(小即是快——老马)。

渐进式 ReHash 实现的思考

好了,扩容和缩容就聊到这里,那么这个渐进式 rehash 到底怎么一个渐进法?

1.扩容前

不需要扩容时应该有至少需要初始化两个元素:

1
2
3
4
hashtable[0] = new HashTable(size);
hashIndex=-1;

hashtable[1] = null;

hashtable 中存储着当前的元素信息,hashIndex=-1 标识当前没有在进行扩容。

2.扩容准备

当需要扩容的时候,我们再去创建一个 hashtable[1],并且 size 是原来的 2倍。

1
2
3
4
5
hashtable[0] = new HashTable(size);

hashtable[1] = new HashTable(2 * size);

hashIndex=-1;

主要是为了节约内存,使用惰性初始化的方式创建 hashtable。

3.扩容时

调整 hashIndex=0…size,逐步去 rehash 到新的 hashtable[1]

新的插入全部放入到 hashtable[1]

4.扩容后

扩容后我们应该把 hashtable[0] 的值更新为 hashtable[1],并且释放掉 hashtable[1] 的资源。

并且设置 hashIndex=-1,标识已经 rehash 完成

1
2
3
4
hashtable[0] = hashtable[1];
hashIndex=-1;

hashtable[1] = null;

这样整体的实现思路就已经差不多了,光说不练假把式,我们下一节就来自己实现一个渐进式 rehash 的 HashMap

十.实现渐进式ReHash Map类定义

类定义

1
2
3
4
5
6
7
8
/**
* 自己实现的渐进式 rehash map
* @param <K> key 泛型
* @param <V> value 泛型
* @see HashMap
*/
public class MyProgressiveReHashMap<K,V> extends AbstractMap<K,V> implements Map<K,V> {
}

和简易版本类似。

私有变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private static final Log log = LogFactory.getLog(MyProgressiveReHashMap.class);

/**
* rehash 的下标
*
* 如果 rehashIndex != -1,说明正在进行 rehash
*/
private int rehashIndex = -1;

/**
* 容量
* 默认为 8
*/
private int capacity;

/**
* 处于 rehash 状态的容量
*/
private int rehashCapacity;

/**
* 统计大小的信息
*/
private int size = 0;

/**
* 阈值
* 阈值=容量*factor
* 暂时不考虑最大值的问题
*
* 当达到这个阈值的时候,直接进行两倍的容量扩充+rehash。
*/
private final double factor = 1.0;

/**
* 用来存放信息的 table 数组。
* 数组:数组的下标是一个桶,桶对应的元素 hash 值相同。
* 桶里放置的是一个链表。
*
* 可以理解为 table 是一个 ArrayList
* arrayList 中每一个元素,都是一个 DoubleLinkedList
*/
private List<List<Entry<K, V>>> table;

/**
* 渐进式 rehash 时,用来存储元素信息使用。
*/
private List<List<Entry<K, V>>> rehashTable;

/**
* 是否开启 debug 模式
*/
private boolean debugMode = false;

rehashIndex/rehashCapacity/rehashTable 这三个值都是我们在进行渐进式实现的时候需要使用的值。

构造器

主要是一些值的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public MyProgressiveReHashMap() {
this(8);
}

/**
* 初始化 hash map
* @param capacity 初始化容量
*/
public MyProgressiveReHashMap(int capacity) {
this(capacity, false);
}

/**
* 初始化 hash map
* @param capacity 初始化容量
* @param debugMode 是否开启 debug 模式
*/
public MyProgressiveReHashMap(int capacity, boolean debugMode) {
this.capacity = capacity;
// 初始化最大为容量的个数,如果 hash 的非常完美的话。
this.table = new ArrayList<>(capacity);
// 初始化为空列表
for(int i = 0; i < capacity; i++) {
this.table.add(i, new ArrayList<Entry<K, V>>());
}
this.debugMode = debugMode;
this.rehashIndex = -1;
this.rehashCapacity = -1;
this.rehashTable = null;
}

put() 方法

这个方法相对难度比较大:

put() 的过程可以见方法的注释。

需要考虑是否为 rehash 阶段,还需要考虑是否为更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* put 一个值
*
* (1)如果不处于 rehash 阶段
*
* 1.1 判断是否为 table 更新,如果是,则进行更新
* 1.2 如果不是更新,则进行插入
*
* 插入的时候可能触发 rehash
*
* (2)如果处于 rehash 阶段
*
* 2.0 执行一次渐进式 rehash 的动作
*
* 2.1 判断是否为更新,需要遍历 table 和 rehashTable
* 如果是,执行更新
*
* 2.2 如果不是,则执行插入
* 插入到 rehashTable 中
*
* @param key 键
* @param value 值
* @return
*/
@Override
public V put(K key, V value) {
boolean isInRehash = isInReHash();
if(!isInRehash) {
//1. 是否为更新
Pair<Boolean, V> pair = updateTableInfo(key, value, this.table, this.capacity);
if(pair.getValueOne()) {
V oldVal = pair.getValueTwo();
if(debugMode) {
log.debug("不处于渐进式 rehash,此次为更新操作。key: {}, value: {}", key, value);
printTable(this.table);
}
return oldVal;
} else {
// 插入
return this.createNewEntry(key, value);
}
} else {
//2.0 执行一个附加操作,进行渐进式 rehash 处理
if(debugMode) {
log.debug("当前处于渐进式 rehash 阶段,额外执行一次渐进式 rehash 的动作");
}
rehashToNew();
//2.1 是否为 table 更新
Pair<Boolean, V> pair = updateTableInfo(key, value, this.table, this.capacity);
if(pair.getValueOne()) {
V oldVal = pair.getValueTwo();
if(debugMode) {
log.debug("此次为更新 table 操作。key: {}, value: {}", key, value);
printTable(this.table);
}
return oldVal;
}
//2.2 是否为 rehashTable 更新
Pair<Boolean, V> pair2 = updateTableInfo(key, value, this.rehashTable, this.rehashCapacity);
if(pair2.getValueOne()) {
V oldVal = pair2.getValueTwo();
if(debugMode) {
log.debug("此次为更新 rehashTable 操作。key: {}, value: {}", key, value);
printTable(this.table);
}
return oldVal;
}
//2.3 插入
return this.createNewEntry(key, value);
}
}
1.是否为 rehash 阶段

这个实现比较简单,就是判断 rehashIndex 是否为 -1:

1
2
3
4
5
6
7
/**
* 是否处于 rehash 阶段
* @return 是否
*/
private boolean isInReHash() {
return rehashIndex != -1;
}
2.更新列表信息

这里为了复用,对方法进行了抽象。可以同时使用到 table 和 rehashTable 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 是否为更新信息
* @param key key
* @param value value
* @param table table 信息
* @param tableCapacity table 的容量(使用 size 也可以,因为都默认初始化了。)
* @return 更新结果
*/
private Pair<Boolean, V> updateTableInfo(K key, V value, final List<List<Entry<K,V>>> table,
final int tableCapacity) {
// 计算 index 值
int hash = HashUtil.hash(key);
int index = HashUtil.indexFor(hash, tableCapacity);
// 判断是否为替换
List<Entry<K,V>> entryList = new ArrayList<>();
if(index < table.size()) {
entryList = table.get(index);
}
// 遍历
for(Entry<K,V> entry : entryList) {
// 二者的 key 都为 null,或者二者的 key equals()
final K entryKey = entry.getKey();
if(ObjectUtil.isNull(key, entryKey)
|| key.equals(entryKey)) {
final V oldValue = entry.getValue();
// 更新新的 value
entry.setValue(value);
return Pair.of(true, oldValue);
}
}
return Pair.of(false, null);
}

这个和以前基本是类似的。

返回结果时,为了同时保存是否为更新,以及更新的 value 值。所以使用了 Pair 工具类。

3.插入新的元素

插入方法也比较麻烦,需要区分是否处于渐进式 rehash 阶段。还要考虑是否需要扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 创建一个新的明细
*
* (1)如果处于渐进式 rehash 中,则设置到 rehashTable 中
* (2)如果不是,则判断是否需要扩容
*
* 2.1 如果扩容,则直接放到 rehashTable 中。
* 因为我们每次扩容内存翻倍,一次只处理一个 index 的信息,所以不会直接 rehash 结束,直接放到新的 rehashTable 中即可
* 2.2 如果不扩容,则放入 table 中
*
* @param key key
* @param value value
*/
private V createNewEntry(final K key,
final V value) {
Entry<K,V> entry = new DefaultMapEntry<>(key, value);
// 重新计算 tableIndex
int hash = HashUtil.hash(key);
//是否处于 rehash 中?
if(isInReHash()) {
int index = HashUtil.indexFor(hash, this.rehashCapacity);
List<Entry<K,V>> list = this.rehashTable.get(index);
list.add(entry);
if(debugMode) {
log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。");
printTable(this.rehashTable);
}
}
// 是否需要扩容 && 不处于渐进式 rehash
// rehash 一定是扩容 rehashTable
// 如果发生了 rehash,元素是直接放到 rehashTable 中的
if(isNeedExpand()) {
rehash();
// 放入到 rehashTable 中
int index = HashUtil.indexFor(hash, this.rehashCapacity);
List<Entry<K,V>> list = this.rehashTable.get(index);
list.add(entry);
if(debugMode) {
log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。");
printTable(this.rehashTable);
}
} else {
int index = HashUtil.indexFor(hash, this.capacity);
List<Entry<K,V>> list = this.table.get(index);
list.add(entry);
if(debugMode) {
log.debug("目前不处于 rehash 中,元素直接插入到 table 中。");
printTable(this.table);
}
}
this.size++;
return value;
}

是否需要扩容的方法也比较简单:

1
2
3
4
5
6
7
8
9
10
11
/**
* 是否需要扩容
*
* 比例满足,且不处于渐进式 rehash 中
* @return 是否
*/
private boolean isNeedExpand() {
// 验证比例
double rate = size*1.0 / capacity*1.0;
return rate >= factor && !isInReHash();
}

不过我们这次添加了一个不要处于渐进式 rehash 过程中。

其中 rehash 的实现也发生了很大的变化,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 直接 rehash 的流程
*
* (1)如果处于 rehash 中,直接返回
* (2)初始化 rehashTable,并且更新 rehashIndex=0;
* (3)获取 table[0],rehash 到 rehashTable 中
* (4)设置 table[0] = new ArrayList();
*/
private void rehash() {
if(isInReHash()) {
if(debugMode) {
log.debug("当前处于渐进式 rehash 阶段,不重复进行 rehash!");
}
return;
}
// 初始化 rehashTable
this.rehashIndex = -1;
this.rehashCapacity = 2*capacity;
this.rehashTable = new ArrayList<>(this.rehashCapacity);
for(int i = 0; i < rehashCapacity; i++) {
rehashTable.add(i, new ArrayList<Entry<K, V>>());
}

// 遍历元素第一个元素,其他的进行渐进式更新。
rehashToNew();
}

渐进式更新的方法,可以在 get/put/remove 等操作时,执行附加操作时使用。

所以单独抽成了一个方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 将信息从旧的 table 迁移到新的 table 中
*
* (1)table[rehashIndex] 重新 rehash 到 rehashTable 中
* (2)设置 table[rehashIndex] = new ArrayList();
* (3)判断是否完成渐进式 rehash
*/
private void rehashToNew() {
rehashIndex++;

List<Entry<K, V>> list = table.get(rehashIndex);
for(Entry<K, V> entry : list) {
int hash = HashUtil.hash(entry);
int index = HashUtil.indexFor(hash, rehashCapacity);
// 添加元素
// 获取列表,避免数组越界
List<Entry<K,V>> newList = rehashTable.get(index);
// 添加元素到列表
// 元素不存在重复,所以不需要考虑更新
newList.add(entry);
rehashTable.set(index, newList);
}

// 清空 index 处的信息
table.set(rehashIndex, new ArrayList<Entry<K, V>>());

// 判断大小是否完成 rehash
// 验证是否已经完成
if(rehashIndex == (table.size()-1)) {
this.capacity = this.rehashCapacity;
this.table = this.rehashTable;
this.rehashIndex = -1;
this.rehashCapacity = -1;
this.rehashTable = null;
if(debugMode) {
log.debug("渐进式 rehash 已经完成。");
printTable(this.table);
}p
} else {
if(debugMode) {
log.debug("渐进式 rehash 处理中, 目前 index:{} 已完成", rehashIndex);
printAllTable();
}
}
}

get() 操作

渐进式 rehash 将动作分散到每一个操作中,我们对 get 方法进行重写,当做一个例子。其他的方法如果实现也是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 查询方法
* (1)如果处于渐进式 rehash 状态,额外执行一次 rehashToNew()
* (2)判断 table 中是否存在元素
* (3)判断 rehashTable 中是否存在元素
* @param key key
* @return 结果
*/
@Override
public V get(Object key) {
if(isInReHash()) {
if(debugMode) {
log.debug("当前处于渐进式 rehash 状态,额外执行一次操作");
rehashToNew();
}
}

//1. 判断 table 中是否存在
V result = getValue(key, this.table);
if(result != null) {
return result;
}

//2. 是否处于渐进式 rehash
if(isInReHash()) {
return getValue(key, this.rehashTable);
}
return null;
}

测试

我们历经千辛万苦,终于实现了一个简单版本的渐进式 hash map。

下面来测试一下功能是否符合我们的预期。

1.put 操作
1
2
3
Map<String, String> map = new MyProgressiveReHashMap<>(2, true);
map.put("1", "1");
map.put("1", "2");

日志:

1
2
3
4
[DEBUG] [2020-10-11 21:30:15.072] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。
{1: 1}
[DEBUG] [2020-10-11 21:30:15.076] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.put] - 不处于渐进式 rehash,此次为更新操作。key: 1, value: 2
{1: 2}

第一次是插入,第二次是更新。

这里都没有触发扩容,下面我们看一下触发扩容的情况。

2.扩容测试
1
2
3
4
5
6
7
8
Map<String, String> map = new MyProgressiveReHashMap<>(2, true);
map.put("1", "1");
map.put("2", "2");
map.put("3", "3");

Assert.assertEquals("1", map.get("1"));
Assert.assertEquals("2", map.get("2"));
Assert.assertEquals("3", map.get("3"));

日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[DEBUG] [2020-10-11 21:31:12.559] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。
{1: 1}
[DEBUG] [2020-10-11 21:31:12.560] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。
{2: 2}
{1: 1}
[DEBUG] [2020-10-11 21:31:12.563] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 处理中, 目前 index:0 已完成
原始 table 信息:
{1: 1}
新的 table 信息:
{2: 2}
[DEBUG] [2020-10-11 21:31:12.563] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前处于 rehash 中,元素直接插入到 rehashTable 中。
{2: 2}
{3: 3}
[DEBUG] [2020-10-11 21:31:12.564] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.get] - 当前处于渐进式 rehash 状态,额外执行一次操作
[DEBUG] [2020-10-11 21:31:12.564] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 已经完成。
{2: 2}
{1: 1}
{3: 3}

当放入元素【3】的时候,已经触发了 rehash。

(1)第一次渐进式 rehash 将 table[0] 的元素 rehash 到了新的节点。

(2)插入的元素直接插入到 rehashTable 中

(3)get 操作时,额外触发一次 rehash,然后所有的 rehash 已经完成。

参考资料

Fluent Interface 流式接口