疯狂的解析网站源码分享(解读疯狂电子版)

大家好,今天来为大家分享疯狂的解析网站源码分享的一些知识点,和解读疯狂电子版的问题解析,大家要是都明白,那么可以忽略,如果不太清楚的话可以看看本篇文章,相信很大概率可以解决您的问题,接下来我们就一起来看看吧!

高性能核心组件穿透的意义

要想成为高手,就要彻底的掌握那些顶级组件、王者组件

王者组件中吸取思想和精华,为大家自己的业务CRUD所用。

唯有如此,才能成为技术王者。

本地缓存的使用场景

场景1:突发性hotkey场景

突发性hotkey导致的分布式缓存性能变差、缓存击穿的场景

什么是热Key

在某段时间内某个key收到的访问次数,显著高于其他key时,我们可以将其称之为热key。

例如,某redis的每秒访问总量为10000,而其中某个key的每秒访问量达到了7000,这种情况下,我们称该key为热key。

热key带来的问题

1.热Key占用大量的RedisCPU时间使其性能变差并影响其它请求;

2.RedisCluster中各node流量不均衡,造成RedisCluster的分布式优势无法被Client利用,

一个分片负载很高,而其它分片十分空闲从而产生读/写热点问题;

3.热Key的请求压力数量超出Redis的承受能力造成缓存击穿,此时大量强求将直接指向后端存储将其打挂并影响到其它业务;

热key出现的典型业务

预期外的访问量抖增,如突然出现的爆款商品,访问量暴涨的热点新闻,直播间某大主播搞活动大量的刷屏点赞。

解决方案

通过分布式计算来探测热点key

分布式计算组件,计算出来之后,并通知集群内其他机器。

其他机器,本地缓存HotKey,

场景2:常规性hotkey场景

部门组织机构数据

人员类型数据

解决方案

本地缓存HotKey,

通过发布订阅解决数据一致性问题

本地缓存的主要技术

Java缓存技术可分为分布式缓存和本地缓存,分布式缓存在后面的100Wqps三级缓存组件中,再细致介绍。

先看本地缓存。

本地缓存的代表技术主要有HashMap,GuavaCache,Caffeine和Encahche。

HashMap

通过Map的底层方式,直接将需要缓存的对象放在内存中。

优点:简单粗暴,不需要引入第三方包,比较适合一些比较简单的场景。

缺点:没有缓存淘汰策略,定制化开发成本高。

GuavaCache

GuavaCache是由Google开源的基于LRU替换算法的缓存技术。

但GuavaCache由于被下面即将介绍的Caffeine全面超越而被取代。

优点:支持最大容量限制,两种过期删除策略(插入时间和访问时间),支持简单的统计功能。

缺点:springboot2和spring5都放弃了对GuavaCache的支持。

Caffeine

Caffeine采用了W-TinyLFU(LUR和LFU的优点结合)开源的缓存技术。

缓存性能接近理论最优,属于是GuavaCache的增强版。

Encache

Ehcache是一个纯java的进程内缓存框架,具有快速、精干的特点。

是hibernate默认的cacheprovider。

优点:

支持多种缓存淘汰算法,包括LFU,LRU和FIFO;

缓存支持堆内缓存,堆外缓存和磁盘缓存;

支持多种集群方案,解决数据共享问题。

说明:本文会分析Caffeine的源码,后面可以对照分析一下GuavaCache的源码。

本地缓存的优缺点

1.快但是量少:访问速度快,但无法进行大数据存储

本地缓存相对于分布式缓存的好处是,由于数据不需要跨网络传输,故性能更好,

但是由于占用了应用进程的内存空间,如Java进程的JVM内存空间,故不能进行大数据量的数据存储。

2.需要解决数据一致性问题:集群的数据更新问题

与此同时,本地缓存只支持被该应用进程访问,一般无法被其他应用进程访问,故在应用进程的集群部署当中,

如果对应的数据库数据,存在数据更新,则需要同步更新不同部署节点的本地缓存的数据来包保证数据一致性,

复杂度较高并且容易出错,如基于rocketmq的发布订阅机制来同步更新各个部署节点。

3.更新低可靠,容易丢失:数据随应用进程的重启而丢失

由于本地缓存的数据是存储在应用进程的内存空间的,所以当应用进程重启时,本地缓存的数据会丢失。

所以对于需要更改然后持久化的数据,需要注意及时保存,否则可能会造成数据丢失。

和缓存相关的几个核心概念

缓存污染

缓存污染,指留存在缓存中的数据,实际不会被再次访问了,但又占据了缓存空间。

换句话说,由于缓存空间有限,热点数据被置换或者驱逐出去了,而一些后面不用到的数据却反而被留下来,从而缓存数据命中率急剧下降

要解决缓存污染的关键点是能识别出热点数据,或者未来更有可能被访问到的数据

换句话说:是要提升缓存数据命中率

缓存命中率

缓存命中率是一个缓存组件是否好用的核心指标之一,而命中率又和缓存组件本身的缓存数据淘汰算法息息相关。

命中:可以直接通过缓存获取到需要的数据。

不命中:无法直接通过缓存获取到想要的数据,需要再次查询数据库或者执行其它的操作。原因可能是由于缓存中根本不存在,或者缓存已经过期。

通常来讲,缓存的命中率越高则表示使用缓存的收益越高,应用的性能越好(响应时间越短、吞吐量越高),抗并发的能力越强。

由此可见,在高并发的互联网系统中,缓存的命中率是至关重要的指标。

而缓存的命中率的提升,和缓存数据淘汰算法,密切相关。

常见的缓存数据淘汰算法

主要的缓存数据淘汰算法(也叫做缓存数据驱逐算法),有三种:

FIFO(Fistinfirstout)先进先出算法如果一个数据最先进入缓存中,则应该最早淘汰掉。LRU(Leastrecentlyused)最近最少使用算法如果数据最近被访问过,那么将来被访问的几率也更高。LFU(Leastfrequentlyused)最近很少使用算法如果一个数据在最近一段时间内使用次数很少,使用频率最低,那么在将来一段时间内被使用的可能性也很小。

参考一下:Redis的8种缓存淘汰策略

从能否解决缓存污染这一维度来分析Redis的8种缓存淘汰策略:

noeviction策略:不会淘汰数据,解决不了。volatile-ttl策略:给数据设置合理的过期时间。当缓存写满时,会淘汰剩余存活时间最短的数据,避免滞留在缓存中,造成污染。volatile-random策略:随机选择数据,无法把不再访问的数据筛选出来,会造成缓存污染。volatile-lru策略:LRU策略只考虑数据的访问时效,对只访问一次的数据,不能很快筛选出来。volatile-lfu策略:LFU策略在LRU策略基础上进行了优化,筛选数据时优先筛选并淘汰访问次数少的数据。allkeys-random策略:随机选择数据,无法把不再访问的数据筛选出来,会造成缓存污染。allkeys-lru策略:LRU策略只考虑数据的访问时效,对只访问一次的数据,不能很快筛选出来。allkeys-lfu策略:LFU策略在LRU策略基础上进行了优化,筛选数据时优先筛选并淘汰访问次数少的数据。

缓存淘汰策略解决缓存污染noeviction策略不能volatile-ttl策略能volatile-random策略不能volatile-lru策略不能volatile-lfu策略能allkeys-random策略不能allkeys-lru策略不能allkeys-lfu策略能

1、FIFO先进先出算法

FIFO(FirstinFirstout)先进先出。可以理解为是一种类似队列的算法实现

算法:

如果一个数据最先进入缓存中,则应该最早淘汰掉。

换句话说:最先进来的数据,被认为在未来被访问的概率也是最低的

因此,当规定空间用尽且需要放入新数据的时候,会优先淘汰最早进来的数据

优点:

最简单、最公平的一种数据淘汰算法,逻辑简单清晰,易于实现

缺点:

这种算法逻辑设计所实现的缓存的命中率是比较低的,因为没有任何额外逻辑能够尽可能的保证常用数据不被淘汰掉

演示:

下面简单演示了FIFO的工作过程,假设存放元素尺寸是3,且队列已满,放置元素顺序如下图所示,当来了一个新的数据“ldy”后,因为元素数量到达了阈值,则首先要进行太淘汰置换操作,然后加入新元素,

操作如图展示:

2、LRU——适用于局部突发流量场景

LRU(TheLeastRecentlyUsed)最近最久未使用算法。

算法:

如果一个数据最近很少被访问到,那么被认为在未来被访问的概率也是最低的,当规定空间用尽且需要放入新数据的时候,会优先淘汰最久未被访问的数据

演示:

下图展示了LRU简单的工作过程,访问时对数据的提前操作,以及数据满且添加新数据的时候淘汰的过程的展示如下:

此处介绍的LRU是有明显的缺点,

如上所述,对于偶发性、周期性的数据没有良好的抵抗力,很容易就造成缓存的污染,影响命中率,

因此衍生出了很多的LRU算法的变种,用以处理这种偶发冷数据突增的场景,

比如:LRU-K、TwoQueues等,目的就是当判别数据为偶发或周期的冷数据时,不会存入空间内,从而降低热数据的淘汰率。

优点:

LRU实现简单,在一般情况下能够表现出很好的命中率,是一个“性价比”很高的算法。

LRU可以有效的对访问比较频繁的数据进行保护,也就是针对热点数据的命中率提高有明显的效果。

LRU局部突发流量场景,对突发性的稀疏流量(sparsebursts)表现很好。

缺点:

在存在周期性的局部热点数据场景,有大概率可能造成缓存污染

最近访问的数据,并不一定是周期性数据,比如把全量的数据做一次迭代,那么LRU会产生较大的缓存污染,因为周期性的局部热点数据,可能会被淘汰。

演进一:LRU-K

下图展示了LRU-K的简单工作过程,

简单理解,LRU中的K是指数据被访问K次,传统LRU与此对比则可以认为传统LRU是LRU-1。

可以看到LRU-K有两个队列,新来的元素先进入到历史访问队列中,该队列用于记录元素的访问次数,采用的淘汰策略是LRU或者FIFO,当历史队列中的元素访问次数达到K的时候,才会进入缓存队列。

演进二:TwoQueues

下图展示了TwoQueues的工作过程,

TwoQueues与LRU-K相比,他也同样是两个队列,不同之处在于,他的队列一个是缓存队列,一个是FIFO队列,

当新元素进来的时候,首先进入FIFO队列,当该队列中的元素被访问的时候,会进入LRU队列,

过程如下:

实际案例

GuavaCache是GoogleGuava工具包中的一个非常方便易用的本地化缓存实现,基于LRU算法实现,支持多种缓存过期策略。由于Guava的大量使用,GuavaCache也得到了大量的应用。

Guava的loadingcache是使用LRU的淘汰策略,但是很多场景,最近的数据不一定热,存在周期性的热点数据,而LRU反而容易把稍旧的周期热数据挤出去,

3、LFU——适用于局部周期性流量场景

LFU(TheLeastFrequentlyUsed)最近很少使用算法,

如果一个数据在最近一段时间内使用次数很少,使用频率最低,那么在将来一段时间内被使用的可能性也很小。

与LRU的区别在于LRU是以时间先后来衡量,LFU是以时间段内的使用次数衡量

算法:

如果一个数据在一定时间内被访问的次数很低,那么被认为在未来被访问的概率也是最低的,

当规定空间用尽且需要放入新数据的时候,会优先淘汰时间段内访问次数最低的数据

演示:

下面描述了LFU的简单工作过程,首先是访问元素增加元素的访问次数,从而提高元素在队列中的位置,降低淘汰优先级,

后面是插入新元素的时候,因为队列已经满了,所以优先淘汰在一定时间间隔内访问频率最低的元素

优点:

LFU适用于局部周期性流量场景,在这个场景下,比LRU有更好的缓存命中率。

在局部周期性流量场景下,LFU是以次数为基准,所以更加准确,自然能有效的保证和提高命中率

缺点:

LFU有几个的缺点:

第一,因为LFU需要记录数据的访问频率,因此需要额外的空间;

第二,它需要给每个记录项维护频率信息,每次访问都需要更新,这是个巨大的开销;

第三,在存在局部突发流量场景下,有大概率可能造成缓存污染,算法命中率会急剧下降,这也是他最大弊端。所以,LFU对突发性的稀疏流量(sparsebursts)是无效的。

why:LFU对突发性的稀疏流量无效呢?

总体来说,LFU按照访问次数或者访问频率取胜,这个次数有一个累计的长周期,导致前期经常访问的数据,访问次数很大,或者说权重很高,

新来的缓存数据,哪怕他是突发热点,但是,新数据的访问次数累计的时间太短,在老人面试,是个矮个子

LFU就想一个企业,有点论资排辈,排斥性新人,新人进来,都需要吃苦头,哪怕他是明日之星

所以,LFU算法中,老的记录已经占用了缓存,过去的一些大量被访问的记录,在将来不一定会继续是热点数据,但是就一直把“坑”占着了,而那些偶然的突破热点数据,不太可能会被保留下来,而是被淘汰。

所以,在存在突发性的稀疏流量下,LFU中的偶然的、稀疏的突发流量在访问频率上,不占优势,很容易被淘汰,造成缓存污染和未来缓存命中率下降。

LRU和LFU的对比

LRU实现简单,在一般情况下能够表现出很好的命中率,是一个“性价比”很高的算法,平时也很常用。

LRU的优点之一对突发性的稀疏流量(sparsebursts)表现很好。

但是,LRU这个优点也带来一个缺点:

对于周期性的局部热点数据,有大概率可能造成缓存污染

最近访问的数据,并不一定是周期性数据,比如把全量的数据做一次迭代,那么LRU会产生较大的缓存污染,因为周期性的数据,可能会被淘汰。

如果是周期性局部热点数据,那么LFU可以达到最高的命中率。

但是LFU有仨个大的缺点:

第一,因为LFU需要记录数据的访问频率,因此需要额外的空间;

第二,它需要给每个记录项维护频率信息,每次访问都需要更新,这是个巨大的时间开销;

第三,对突发性的局部热点数据/稀疏流量(sparsebursts),算法命中率会急剧下降,这也是他最大弊端。

无论LRU还是LFU都有其各自的缺点,不过,现在已经有很多针对其缺点而改良、优化出来的变种算法。

4、TinyLFU

TinyLFU就是其中一个优化算法,它是专门为了解决LFU上述提到的三个问题而被设计出来的。

第1:如何减少访问频率的保存,所带来的空间开销

第2:如何减少访问记录的更新,所带来的时间开销

第3:如果提升对局部热点数据的算法命中率

解决第1个问题/第2个问题是采用了Count–MinSketch算法。

解决第二个问题是让老的访问记录,尽量保证“新鲜度”(FreshnessMechanism)

首先:如何解决访问频率维护的时间开销和空间开销

解决措施:使用Count-MinSketch算法存储访问频率,极大的节省空间;并且减少hash碰撞。

关于Count-MinSketch算法,可以看作是布隆过滤器的同源的算法,

假如我们用一个hashmap来存储每个元素的访问次数,那这个量级是比较大的,并且hash冲突的时候需要做一定处理,否则数据会产生很大的误差,

如果用hashmap的方式,相同的下标变成链表,这种方式会占用很大的内存,而且速度也不是很快。

其实一个hash函数会冲突是比较低的,布隆过滤器的优化之一,设置多个hash函数,多个hash函数,个个都冲突的概率就微乎其微了。

Count-MinSketch算法将一个hash操作,扩增为多个hash,这样原来hash冲突的概率就降低了几个等级,且当多个hash取得数据的时候,取最低值,也就是CountMin的含义所在。

Sketch是草图、速写的意思。

将要介绍的Count–MinSketch的原理跟BloomFilter一样,只不过BloomFilter只有0和1的值,那么你可以把Count–MinSketch看作是“数值”版的BloomFilter。

布隆过滤器原理

布隆过滤器是由一个固定大小的二进制向量或者位图(bitmap)和一系列映射函数组成的。

在初始状态时,对于长度为m的位数组,它的所有位都被置为0,如下图所示:

当有变量被加入集合时,通过K个映射函数将这个变量映射成位图中的K个点,把它们置为1。

查询某个变量的时候我们只要看看这些点是不是都是1就可以大概率知道集合中有没有它了

如果这些点有任何一个0,则被查询变量一定不在;如果都是1,则被查询变量很可能存在。为什么说是可能存在,而不是一定存在呢?那是因为映射函数本身就是散列函数,散列函数是会有碰撞的。

误判率:布隆过滤器的误判是指多个输入经过哈希之后在相同的bit位置1了,这样就无法判断究竟是哪个输入产生的,因此误判的根源在于相同的bit位被多次映射且置1。这种情况也造成了布隆过滤器的删除问题,因为布隆过滤器的每一个bit并不是独占的,很有可能多个元素共享了某一位。如果我们直接删除这一位的话,会影响其他的元素。

特性

一个元素如果判断结果为存在的时候元素不一定存在,但是判断结果为不存在的时候则一定不存在。布隆过滤器可以添加元素,但是不能删除元素。因为删掉元素会导致误判率增加。

Count-MinSketch算法原理

下图展示了Count-MinSketch算法简单的工作原理:

假设有四个hash函数,每当元素被访问时,将进行次数加1;此时会按照约定好的四个hash函数进行hash计算找到对应的位置,相应的位置进行+1操作;当获取元素的频率时,同样根据hash计算找到4个索引位置;取得四个位置的频率信息,然后根据CountMin取得最低值作为本次元素的频率值返回,即Min(Count);

Count-MinSketch算法详细实现方案如下:

如何进行Count-MinSketch访问次数的空间开销?

用4个hash函数会存访问次数,那空间就是4倍了。怎么优化呢

解决办法是:

访问次数超过15次其实是很热的数据了,没必要存太大的数字。所以我们用4位就可以存到15了。

一个long有64位,可以存16个4位。

一个访问次数占4个位,一个long有64位,可以存16个访问次数,4个访问一次一组的话,一个long可以分为4组。

一个key对应到4个hash值,也就是4个访问次数,那么,一个long可以分为存储4个Key的访问次数。

最终,一个long对应的数组大小其实是容量的4倍了。

其次,如果提升对局部热点数据的算法命中率

答案是,保鲜机制

为了让缓存保证“新鲜度”,

可以简单理解为:访问次数太大,越不新鲜,适当降低,保证新鲜度。

所以,Caffeine有一个FreshnessMechanism,

Caffeine会剔除掉过往频率很高,但之后不经常的缓存,。

做法很简答,

就是当整体的统计计数(当前所有记录的频率统计之和,这个数值内部维护)达到某一个值时,那么所有记录的频率统计除以2。

TinyLFU的算法流程

TinyLFU’sarchitectureisillustratedinFigure1.

Here,thecacheevictionpolicypicksacachevictim,whileTinyLFUdecidesifreplacingthecachevictimwiththenewitemisexpectedtoincreasethehit-ratio.

当缓存空间不够的时候,TinyLFU找到要淘汰的元素(thecachevictim),也就是使用频率最小的元素,

然后TinyLFU决定将新元素放入缓存,替代将要淘汰的元素(thecachevictim)

具体的流程如下:

5、W-TinyLFU

Caffeine通过测试发现TinyLFU在面对突发性的稀疏流量(sparsebursts)时表现很差,

why?

因为新的记录(newitems)还没来得及建立足够的频率就被剔除出去了,这就使得命中率下降。

W-TinyLFU是如何演进过来的呢?

首先W-TinyLFU看名字就能大概猜出来,它是LFU的变种,也是TinyLFU的变种,当然,也是一种缓存淘汰算法。

W-TinyLFU是如何演进过来的呢?

前面讲到:

LRU能很好的处理局部突发流量LFU能很好的处理局部周期流量

so,取其精华去其糟粕,结合二者的优点

W-TinyLFU=LRU+LFU

当然,总是有个是大股东,这就是LFU,或者说是TinyLFU

so:W-TinyLFU(WindowTinyLeastFrequentlyUsed)是对TinyLFU的的优化和加强,加入LRU以应对局部突发流量,从而实现缓存命中率的最优。

W-TinyLFU的数据架构

W-TinyLFU是怎么引入LRU的呢?他增加了一个W-LRU窗口队列的组件。

当一个数据进来的时候,会进行筛选比较,进入W-LRU窗口队列,经过淘汰后进入Count-MinSketch算法过滤器,通过访问访问频率判决,是否进入缓存。

W-TinyLFU的设计如下所示:

W-LRU窗口队列用于应对局部突发流量TinyLFU用于应对局部周期流量

如果一个数据最近被访问的次数很低,那么被认为在未来被访问的概率也是最低的,当规定空间用尽的时候,会优先淘汰最近访问次数很低的数据;

进一步的分治和解耦

W-TinyLFU将缓存存储空间分为两个大的区域:WindowCache和MainCache,

WindowCache是一个标准的LRUCache,MainCache则是一个SLRU(SegmemtedLRU)cache,

MainCache进一步划分为ProtectedCache(保护区)和ProbationCache(考察区)两个区域,这两个区域都是基于LRU的Cache。

Protected是一个受保护的区域,该区域中的缓存项不会被淘汰。

而且经过实验发现当window区配置为总容量的1%,剩余的99%当中的80%分给protected区,20%分给probation区时,这时整体性能和命中率表现得最好,所以Caffeine默认的比例设置就是这个。

不过这个比例Caffeine会在运行时根据统计数据(statistics)去动态调整,如果你的应用程序的缓存随着时间变化比较快的话,或者说具备的突发特点数据多,那么增加window区的比例可以提高命中率,

如果周期性热地数据多,缓存都是比较固定不变的话,增加MainCache区(protected区+probation区)的比例会有较好的效果。

W-TinyLFU的算法流程

当window区满了,就会根据LRU把candidate(即淘汰出来的元素)放到Probation区域,

Probation区域则是一个观察区,当有新的缓存项需要进入Probation区时,

如果Probation区空间已满,则会将新进入的缓存项与Probation区中根据LRU规则需要被淘汰(victim)的缓存项进行比较,两个进行“PK”,胜者留在probation,输者就要被淘汰了。

TinyLFU写入机制为:

当有新的缓存项写入缓存时,会先写入WindowCache区域,当WindowCache空间满时,最旧的缓存项会被移出WindowCache。

如果ProbationCache未满,从WindowCache移出的缓存项会直接写入ProbationCache;

如果ProbationCache已满,则会根据TinyLFU算法确定从WindowCache移出的缓存项是丢弃(淘汰)还是写入ProbationCache。

ProbationCache中的缓存项如果访问频率达到一定次数,会提升到ProtectedCache;

如果ProtectedCache也满了,最旧的缓存项也会移出ProtectedCache,然后根据TinyLFU算法确定是丢弃(淘汰)还是写入ProbationCache。

TinyLFU淘汰机制为:

从WindowCache或ProtectedCache移出的缓存项称为Candidate,ProbationCache中最旧的缓存项称为Victim。

如果Candidate缓存项的访问频率大于Victim缓存项的访问频率,则淘汰掉Victim。

如果Candidate小于或等于Victim的频率,那么如果Candidate的频率小于5,则淘汰掉Candidate;否则,则在Candidate和Victim两者之中随机地淘汰一个。

从上面对W-TinyLFU的原理描述可知,

caffeine综合了LFU和LRU的优势,将不同特性的缓存项存入不同的缓存区域,最近刚产生的缓存项进入Window区,不会被淘汰;

访问频率高的缓存项进入Protected区,也不会淘汰;

介于这两者之间的缓存项存在Probation区,当缓存空间满了时,Probation区的缓存项会根据访问频率判断是保留还是淘汰;

通过这种机制,很好的平衡了访问频率和访问时间新鲜程度两个维度因素,尽量将新鲜的访问频率高的缓存项保留在缓存中。

同时在维护缓存项访问频率时,引入计数器饱和和衰减机制,即节省了存储资源,也能较好的处理稀疏流量、短时超热点流量等传统LRU和LFU无法很好处理的场景。

W-TinyLFU的优点:

使用Count-MinSketch算法存储访问频率,极大的节省空间;

TinyLFU会定期进行新鲜度提升、访问次数的衰减操作,应对访问模式变化;

并且使用W-LRU机制能够尽可能避免缓存污染的发生,在过滤器内部会进行筛选处理,避免低频数据置换高频数据。

W-TinyLFU的缺点:

目前已知应用于CaffeineCache组件里,应用不是很多。

W-TinyLFU与JVM分代内存关联的想通之处

在caffeine所有的数据都在ConcurrentHashMap中,这个和guavacache不同,guavacache是自己实现了个类似ConcurrentHashMap的结构。

与JVM分代内存类似,在caffeine中有三个记录引用的LRU队列:

Eden队列=window:

在caffeine中规定只能为缓存容量的%1,

假如:size=100,那Eden队列的有效大小就等于1。

Eden的作用:记录的是新到的数据,防止突发流量由于之前没有访问频率,而导致被淘汰。

比如有一部新剧上线,在最开始其实是没有访问频率的,

防止上线之后被其他缓存淘汰出去,而加入这个区域。

可以理解为,伊甸园区,保留的刚刚诞生的未年轻人,防止没有长大,直接被大人干死了。

Eden队列满了之后,淘汰的叫做candidate候选人,进入Probation队列

Probation队列:

可以叫做考察队列,类似于surviver区

在这个队列就代表寿命开始延长,如果Eden队列队列已经满了,那种最近没有被访问过的元素,数据相对比较冷,第一轮淘汰,进行Probation队列做临时的考察。

如果经常了一段时间的积累,访问的频率还是不高,将被进入到Protected队列。

Probation队列满了之后,淘汰的叫做victim牺牲品,进入Protected队列

Protected队列:

可以叫做保护队列,类似于老年代

到了这个队列中,意味着已经取,你暂时不会被淘汰,

但是别急,如果Probation队列没有数据,或者Protected数据满了,你也将会被面临淘汰的尴尬局面。

当然想要变成这个队列,需要把Probation访问一次之后,就会提升为Protected队列。

这个有效大小为(size减去eden)X80%如果size=100,就会是79。

这三个队列关系如下:

所有的新数据都会进入Eden。Eden满了,淘汰进入Probation。

如果在Probation中访问了其中某个数据,如果考察区空间不够,则这个数据升级为Protected。

如果Protected满了,又会继续降级为Probation。

对于发生数据淘汰的时候,会从Eden中选择候选人,和Probation中victim进行淘汰pk。

会把Probation队列中的数据队头称为受害者,这个队头肯定是最早进入Probation的,按照LRU队列的算法的话那他其实他就应该被淘汰,但是在这里只能叫他受害者,Probation是考察队列,代表马上要给他行刑了。

Eden中选择候选人,也是Probation中队尾元素,也叫攻击者。这里受害者会和攻击者皇城PK决出我们应该被淘汰的。

通过我们的Count-MinSketch中的记录的频率数据有以下几个判断:

如果candidateFreq大于victimFreq,那么受害者就直接被淘汰。

如果candidateFreq大于>=6,那么随机淘汰。

如果candidateFreq大于<6,淘汰candidate,留下victimKey

Java本地缓存使用实操

基于HashMap实现LRU

通过Map的底层方式,直接将需要缓存的对象放在内存中。

优点:简单粗暴,不需要引入第三方包,比较适合一些比较简单的场景。缺点:没有缓存淘汰策略,定制化开发成本高。

packagecom.crazymakercircle.cache;\n\nimportcom.crazymakercircle.util.Logger;\nimportorg.junit.Test;\n\nimportjava.util.HashMap;\nimportjava.util.LinkedHashMap;\nimportjava.util.Map;\n\npublicclassLruDemo{\n\n\n@Test\npublicvoidtestSimpleLRUCache(){\n\nSimpleLRUCachecache=newSimpleLRUCache(2/*缓存容量*/);\ncache.put(1,1);\ncache.put(2,2);\nLogger.cfo(cache.get(1));//返回1\ncache.put(3,3);//该操作会使得2淘汰\nLogger.cfo(cache.get(2));//返回-1(未找到)\ncache.put(4,4);//该操作会使得1淘汰\nLogger.cfo(cache.get(1));//返回-1(未找到)\nLogger.cfo(cache.get(3));//返回3\nLogger.cfo(cache.get(4));//返回4\n}\n\n@Test\npublicvoidtestLRUCache(){\n\nLRUCachecache=newLRUCache(2/*缓存容量*/);\ncache.put(1,1);\ncache.put(2,2);\nLogger.cfo(cache.get(1));//返回1\ncache.put(3,3);//该操作会使得2淘汰\nLogger.cfo(cache.get(2));//返回-1(未找到)\ncache.put(4,4);//该操作会使得1淘汰\nLogger.cfo(cache.get(1));//返回-1(未找到)\nLogger.cfo(cache.get(3));//返回3\nLogger.cfo(cache.get(4));//返回4\n}\n\n\n\nstaticclassSimpleLRUCacheextendsLinkedHashMap<Integer,Integer>{\nprivateintcapacity;\n\npublicSimpleLRUCache(intcapacity){\nsuper(capacity,0.75F,true);\nthis.capacity=capacity;\n}\n\npublicintget(intkey){\nreturnsuper.getOrDefault(key,-1);\n}\n\npublicvoidput(intkey,intvalue){\nsuper.put(key,value);\n}\n\n@Override\nprotectedbooleanremoveEldestEntry(Map.Entry<Integer,Integer>eldest){\nreturnsize()>capacity;\n}\n}\n\nstaticprivateclassEntry{\nprivateintkey;\nprivateintvalue;\nprivateEntrybefore;\nprivateEntryafter;\n\npublicEntry(){\n}\n\npublicEntry(intkey,intvalue){\nthis.key=key;\nthis.value=value;\n}\n}\n\nstaticclassLRUCache{\n//map容器,空间换时间,保存key对应的CacheNode,保证用O(1)的时间获取到value\nprivateMap<Integer,Entry>cacheMap=newHashMap<Integer,Entry>();\n//最大容量\nprivateintcapacity;\n/**\n*通过双向指针来保证数据的插入更新顺序,以及队尾淘汰机制\n*/\n//头指针\nprivateEntryhead;\n//尾指针\nprivateEntrytail;\n\n//容器大小\nprivateintsize;\n\n\n/**\n*初始化双向链表,容器大小\n*/\npublicLRUCache(intcapacity){\nthis.capacity=capacity;\nhead=newEntry();\ntail=newEntry();\nhead.after=tail;\ntail.before=head;\n}\n\npublicintget(intkey){\nEntrynode=cacheMap.get(key);\nif(node==null){\nreturn-1;\n}\n//node!=null,返回node后需要把访问的node移动到双向链表头部\nmoveToHead(node);\nreturnnode.value;\n}\n\npublicvoidput(intkey,intvalue){\nEntrynode=cacheMap.get(key);\nif(node==null){\n//缓存不存在就新建一个节点,放入Map以及双向链表的头部\nEntrynewNode=newEntry(key,value);\ncacheMap.put(key,newNode);\naddToHead(newNode);\nsize++;\n//如果超出缓存容器大小,就移除队尾元素\nif(size>capacity){\nEntryremoveNode=removeTail();\ncacheMap.remove(removeNode.key);\nsize–;\n}\n}else{\n//如果已经存在,就把node移动到头部。\nnode.value=value;\nmoveToHead(node);\n}\n}\n\n/**\n*移动节点到头部:\n*1、删除节点\n*2、把节点添加到头部\n*/\nprivatevoidmoveToHead(Entrynode){\nremoveNode(node);\naddToHead(node);\n}\n\n/**\n*移除队尾元素\n*/\nprivateEntryremoveTail(){\nEntrynode=tail.before;\nremoveNode(node);\nreturnnode;\n}\n\nprivatevoidremoveNode(Entrynode){\n\nnode.before.after=node.after;\nnode.after.before=node.before;\n}\n\n/**\n*把节点添加到头部\n*/\nprivatevoidaddToHead(Entrynode){\nhead.after.before=node;\nnode.after=head.after;\nhead.after=node;\nnode.before=head;\n}\n}\n}\n

GuavaCache使用案例

GuavaCache是由Google开源的基于LRU替换算法的缓存技术。

但GuavaCache由于被下面即将介绍的Caffeine全面超越而被取代,因此不特意编写示例代码了,有兴趣的读者可以访问GuavaCache主页。

优点:支持最大容量限制,两种过期删除策略(插入时间和访问时间),支持简单的统计功能。缺点:springboot2和spring5都放弃了对GuavaCache的支持。

Caffeine使用案例

Caffeine采用了W-TinyLFU(LUR和LFU的优点结合)开源的缓存技术。缓存性能接近理论最优,属于是GuavaCache的增强版。

packagecom.github.benmanes.caffeine.demo;\n\nimportcom.github.benmanes.caffeine.cache.*;\nimportorg.checkerframework.checker.nullness.qual.NonNull;\nimportorg.checkerframework.checker.nullness.qual.Nullable;\n\nimportjava.util.concurrent.TimeUnit;\n\npublicclassDemo1{\nstaticSystem.Loggerlogger=System.getLogger(Demo1.class.getName());\n\npublicstaticvoidhello(String[]args){\nSystem.out.println(&34;+args);\n}\n\n\npublicstaticvoidmain(String…args)throwsException{\nCache<String,String>cache=Caffeine.newBuilder()\n//最大个数限制\n//最大容量1024个,超过会自动清理空间\n.maximumSize(1024)\n//初始化容量\n.initialCapacity(1)\n//访问后过期(包括读和写)\n//5秒没有读写自动删除\n.expireAfterAccess(5,TimeUnit.SECONDS)\n//写后过期\n.expireAfterWrite(2,TimeUnit.HOURS)\n//写后自动异步刷新\n.refreshAfterWrite(1,TimeUnit.HOURS)\n//记录下缓存的一些统计数据,例如命中率等\n.recordStats()\n.removalListener(((key,value,cause)->{\n//清理通知key,value==>键值对cause==>清理原因\nSystem.out.println(&34;+key);\n}))\n//使用CacheLoader创建一个LoadingCache\n.build(newCacheLoader<String,String>(){\n//同步加载数据\n@Nullable\n@Override\npublicStringload(@NonNullStringkey)throwsException{\nSystem.out.println(&34;+key);\nreturn&34;+key;\n}\n\n//异步加载数据\n@Nullable\n@Override\npublicStringreload(@NonNullStringkey,@NonNullStringoldValue)throwsException{\nSystem.out.println(&34;+key);\nreturn&34;+key;\n}\n});\n\n//添加值\ncache.put(&34;,&34;);\ncache.put(&34;,&34;);\n\n//获取值\n@NullableStringvalue=cache.getIfPresent(&34;);\nSystem.out.println(&34;+value);\n//remove\ncache.invalidate(&34;);\nvalue=cache.getIfPresent(&34;);\nSystem.out.println(&34;+value);\n}\n}\n

Encache使用案例

Ehcache是一个纯java的进程内缓存框架,具有快速、精干的特点。是hibernate默认的cacheprovider。

优点:支持多种缓存淘汰算法,包括LFU,LRU和FIFO;缓存支持堆内缓存,堆外缓存和磁盘缓存;支持多种集群方案,解决数据共享问题。缺点:性能比Caffeine差

packagecom.crazymakercircle.cache;\n\nimportcom.crazymakercircle.im.common.bean.User;\nimportcom.crazymakercircle.util.IOUtil;\nimportnet.sf.ehcache.Cache;\nimportnet.sf.ehcache.CacheManager;\nimportnet.sf.ehcache.Element;\nimportnet.sf.ehcache.config.CacheConfiguration;\n\nimportjava.io.InputStream;\n\nimportstaticcom.crazymakercircle.util.IOUtil.getResourcePath;\n\npublicclassEhcacheDemo{\npublicstaticvoidmain(String[]args){\n\n//1.创建缓存管理器\nStringinputStream=getResourcePath(&34;);\nCacheManagercacheManager=CacheManager.create(inputStream);\n\n//2.获取缓存对象\nCachecache=cacheManager.getCache(&34;);\nCacheConfigurationconfig=cache.getCacheConfiguration();\nconfig.setTimeToIdleSeconds(60);\nconfig.setTimeToLiveSeconds(120);\n\n//3.创建元素\nElementelement=newElement(&34;,&34;);\n\n//4.将元素添加到缓存\ncache.put(element);\n\n//5.获取缓存\nElementvalue=cache.get(&34;);\nSystem.out.println(&34;+value);\nSystem.out.println(value.getObjectValue());\n\n//6.删除元素\ncache.remove(&34;);\n\nUseruser=newUser(&34;,&34;);\nElementelement2=newElement(&34;,user);\ncache.put(element2);\nElementvalue2=cache.get(&34;);\nSystem.out.println(&34;+value2);\nUseruser2=(User)value2.getObjectValue();\nSystem.out.println(user2);\n\nSystem.out.println(cache.getSize());\n\n//7.刷新缓存\ncache.flush();\n\n//8.关闭缓存管理器\ncacheManager.shutdown();\n\n}\n}publicclassEncacheTest{\npublicstaticvoidmain(String[]args)throwsException{\n//声明一个cacheBuilder\nCacheManagercacheManager=CacheManagerBuilder.newCacheManagerBuilder().withCache(&34;,CacheConfigurationBuilder//声明一个容量为20的堆内缓存\n.newCacheConfigurationBuilder(String.class,String.class,ResourcePoolsBuilder.heap(20))).build(true);\n//获取Cache实例\nCache<String,String>myCache=cacheManager.getCache(&34;,String.class,String.class);\n//写缓存\nmyCache.put(&34;,&34;);\n//读缓存\nStringvalue=myCache.get(&34;);\n//移除换粗\ncacheManager.removeCache(&34;);\ncacheManager.close();\n}\n}\n

caffeine的使用实操

在pom.xml中添加caffeine依赖:

<dependency>\n<groupId>com.github.ben-manes.caffeine</groupId>\n<artifactId>caffeine</artifactId>\n<version>2.5.5</version>\n</dependency>\n

创建一个Caffeine缓存(类似一个map):

Cache<String,Object>manualCache=Caffeine.newBuilder()\n.expireAfterWrite(10,TimeUnit.MINUTES)\n.maximumSize(10_000)\n.build();\n

常见用法:

publicstaticvoidmain(String…args)throwsException{\nCache<String,String>cache=Caffeine.newBuilder()\n//最大个数限制\n//最大容量1024个,超过会自动清理空间\n.maximumSize(1024)\n//初始化容量\n.initialCapacity(1)\n//访问后过期(包括读和写)\n//5秒没有读写自动删除\n.expireAfterAccess(5,TimeUnit.SECONDS)\n//写后过期\n.expireAfterWrite(2,TimeUnit.HOURS)\n//写后自动异步刷新\n.refreshAfterWrite(1,TimeUnit.HOURS)\n//记录下缓存的一些统计数据,例如命中率等\n.recordStats()\n.removalListener(((key,value,cause)->{\n//清理通知key,value==>键值对cause==>清理原因\nSystem.out.println(&34;+key);\n}))\n//使用CacheLoader创建一个LoadingCache\n.build(newCacheLoader<String,String>(){\n//同步加载数据\n@Nullable\n@Override\npublicStringload(@NonNullStringkey)throwsException{\nSystem.out.println(&34;+key);\nreturn&34;+key;\n}\n\n//异步加载数据\n@Nullable\n@Override\npublicStringreload(@NonNullStringkey,@NonNullStringoldValue)throwsException{\nSystem.out.println(&34;+key);\nreturn&34;+key;\n}\n});\n\n//添加值\ncache.put(&34;,&34;);\ncache.put(&34;,&34;);\n\n//获取值\n@NullableStringvalue=cache.getIfPresent(&34;);\nSystem.out.println(&34;+value);\n//remove\ncache.invalidate(&34;);\nvalue=cache.getIfPresent(&34;);\nSystem.out.println(&34;+value);\n}\n\n

参数方法:

initialCapacity(1)初始缓存长度为1;maximumSize(100)最大长度为100;expireAfterWrite(1,TimeUnit.DAYS)设置缓存策略在1天未写入过期缓存。

过期策略

在Caffeine中分为两种缓存,一个是有界缓存,一个是无界缓存,无界缓存不需要过期并且没有界限。

在有界缓存中提供了三个过期API:

expireAfterWrite:代表着写了之后多久过期。(上面列子就是这种方式)expireAfterAccess:代表着最后一次访问了之后多久过期。expireAfter:在expireAfter中需要自己实现Expiry接口,这个接口支持create、update、以及access了之后多久过期。

注意这个API和前面两个API是互斥的。这里和前面两个API不同的是,需要你告诉缓存框架,它应该在具体的某个时间过期,也就是通过前面的重写create、update、以及access的方法,获取具体的过期时间。

填充(Population)特性

填充特性是指如何在key不存在的情况下,如何创建一个对象进行返回,主要分为下面四种

1手动(Manual)

publicstaticvoidmain(String…args)throwsException{\nCache<String,Integer>cache=Caffeine.newBuilder().build();\n\nIntegerage1=cache.getIfPresent(&34;);\nSystem.out.println(age1);\n\n//当key不存在时,会立即创建出对象来返回,age2不会为空\nIntegerage2=cache.get(&34;,k->{\nSystem.out.println(&34;+k);\nreturn18;\n});\nSystem.out.println(age2);\n}\nnull\nk:张三\n18\n

Cache接口允许显式的去控制缓存的检索,更新和删除。

我们可以通过cache.getIfPresent(key)方法来获取一个key的值,通过cache.put(key,value)方法显示的将数控放入缓存,但是这样子会覆盖缓原来key的数据。更加建议使用cache.get(key,k->value)的方式,get方法将一个参数为key的Function(createExpensiveGraph)作为参数传入。如果缓存中不存在该键,则调用这个Function函数,并将返回值作为该缓存的值插入缓存中。get方法是以阻塞方式执行调用,即使多个线程同时请求该值也只会调用一次Function方法。这样可以避免与其他线程的写入竞争,这也是为什么使用get优于getIfPresent的原因。

注意:如果调用该方法返回NULL(如上面的createExpensiveGraph方法),则cache.get返回null,如果调用该方法抛出异常,则get方法也会抛出异常。

可以使用Cache.asMap()方法获取ConcurrentMap进而对缓存进行一些更改。

2自动(Loading)

publicstaticvoidmain(String…args)throwsException{\n\n//此时的类型是LoadingCache不是Cache\nLoadingCache<String,Integer>cache=Caffeine.newBuilder().build(key->{\nSystem.out.println(&34;+key);\nreturn18;\n});\n\nIntegerage1=cache.getIfPresent(&34;);\nSystem.out.println(age1);\n\n//key不存在时会根据给定的CacheLoader自动装载进去\nIntegerage2=cache.get(&34;);\nSystem.out.println(age2);\n}\nnull\n自动填充:张三\n18\n

3异步手动(AsynchronousManual)

publicstaticvoidmain(String…args)throwsException{\nAsyncCache<String,Integer>cache=Caffeine.newBuilder().buildAsync();\n\n//会返回一个future对象,调用future对象的get方法会一直卡住直到得到返回,和多线程的submit一样\nCompletableFuture<Integer>ageFuture=cache.get(&34;,name->{\nSystem.out.println(&34;+name);\nreturn18;\n});\n\nIntegerage=ageFuture.get();\nSystem.out.println(&34;+age);\n}\nname:张三\nage:18\n

4异步自动(AsynchronouslyLoading)

publicstaticvoidmain(String…args)throwsException{\n//和1.4基本差不多\nAsyncLoadingCache<String,Integer>cache=Caffeine.newBuilder().buildAsync(name->{\nSystem.out.println(&34;+name);\nreturn18;\n});\nCompletableFuture<Integer>ageFuture=cache.get(&34;);\n\nIntegerage=ageFuture.get();\nSystem.out.println(&34;+age);\n}\n

refresh刷新策略

何为更新策略?就是在设定多长时间后会自动刷新缓存。

Caffeine提供了refreshAfterWrite()方法,来让我们进行写后多久更新策略:

LoadingCache<String,String>build=CacheBuilder.newBuilder().refreshAfterWrite(1,TimeUnit.DAYS)\n.build(newCacheLoader<String,String>(){\n@Override\npublicStringload(Stringkey){\nreturn&34;;\n}\n});\n}\n

上面的代码我们需要建立一个CacheLodaer来进行刷新,这里是同步进行的,可以通过buildAsync方法进行异步构建。

在实际业务中这里可以把我们代码中的mapper传入进去,进行数据源的刷新。

但是实际使用中,你设置了一天刷新,但是一天后你发现缓存并没有刷新。

这是因为只有在1天后这个缓存再次访问后才能刷新,如果没人访问,那么永远也不会刷新。

我们来看看自动刷新是怎么做的呢?

自动刷新只存在读操作之后,也就是我们的afterRead()这个方法,其中有个方法叫refreshIfNeeded,它会根据你是同步还是异步然后进行刷新处理。

驱逐策略(eviction)

Caffeine提供三类驱逐策略:基于大小(size-based),基于时间(time-based)和基于引用(reference-based)。

基于大小(size-based)

基于大小驱逐,有两种方式:一种是基于缓存大小,一种是基于权重。

//Evictbasedonthenumberofentriesinthecache\n//根据缓存的计数进行驱逐\nLoadingCache<Key,Graph>graphs=Caffeine.newBuilder()\n.maximumSize(10_000)\n.build(key->createExpensiveGraph(key));\n\n//Evictbasedonthenumberofverticesinthecache\n//根据缓存的权重来进行驱逐(权重只是用于确定缓存大小,不会用于决定该缓存是否被驱逐)\nLoadingCache<Key,Graph>graphs=Caffeine.newBuilder()\n.maximumWeight(10_000)\n.weigher((Keykey,Graphgraph)->graph.vertices().size())\n.build(key->createExpensiveGraph(key));\n

我们可以使用Caffeine.maximumSize(long)方法来指定缓存的最大容量。当缓存超出这个容量的时候,会使用WindowTinyLfu策略来删除缓存。

我们也可以使用权重的策略来进行驱逐,可以使用Caffeine.weigher(Weigher)函数来指定权重,使用Caffeine.maximumWeight(long)函数来指定缓存最大权重值。

maximumWeight与maximumSize不可以同时使用。

基于时间(Time-based)

//Evictbasedonafixedexpirationpolicy\n//基于固定的到期策略进行退出\nLoadingCache<Key,Graph>graphs=Caffeine.newBuilder()\n.expireAfterAccess(5,TimeUnit.MINUTES)\n.build(key->createExpensiveGraph(key));\nLoadingCache<Key,Graph>graphs=Caffeine.newBuilder()\n.expireAfterWrite(10,TimeUnit.MINUTES)\n.build(key->createExpensiveGraph(key));\n\n//Evictbasedonavaryingexpirationpolicy\n//基于不同的到期策略进行退出\nLoadingCache<Key,Graph>graphs=Caffeine.newBuilder()\n.expireAfter(newExpiry<Key,Graph>(){\n@Override\npubliclongexpireAfterCreate(Keykey,Graphgraph,longcurrentTime){\n//Usewallclocktime,ratherthannanotime,iffromanexternalresource\nlongseconds=graph.creationDate().plusHours(5)\n.minus(System.currentTimeMillis(),MILLIS)\n.toEpochSecond();\nreturnTimeUnit.SECONDS.toNanos(seconds);\n}\n\n@Override\npubliclongexpireAfterUpdate(Keykey,Graphgraph,\nlongcurrentTime,longcurrentDuration){\nreturncurrentDuration;\n}\n\n@Override\npubliclongexpireAfterRead(Keykey,Graphgraph,\nlongcurrentTime,longcurrentDuration){\nreturncurrentDuration;\n}\n})\n.build(key->createExpensiveGraph(key));\n

Caffeine提供了三种定时驱逐策略:

expireAfterAccess(long,TimeUnit):在最后一次访问或者写入后开始计时,在指定的时间后过期。假如一直有请求访问该key,那么这个缓存将一直不会过期。expireAfterWrite(long,TimeUnit):在最后一次写入缓存后开始计时,在指定的时间后过期。expireAfter(Expiry):自定义策略,过期时间由Expiry实现独自计算。

缓存的删除策略使用的是惰性删除和定时删除。这两个删除策略的时间复杂度都是O(1)。

测试定时驱逐不需要等到时间结束。我们可以使用Ticker接口和Caffeine.ticker(Ticker)方法在缓存生成器中指定时间源,而不必等待系统时钟。如:

FakeTickerticker=newFakeTicker();//Guava&34;abc&34;开始&34;结束&34;Key%swasremoved(%s)%n&34;ShortCircuitBoolean&34;evictionLock&34;evictionLock&34;evictionLock&34;NullAway&39;sweightexceedsthemaximum\nif(candidate.getPolicyWeight()>maximum()){\nNode<K,V>evict=candidate;\ncandidate=candidate.getNextInAccessOrder();\nevictEntry(evict,RemovalCause.SIZE,0L);\ncontinue;\n}\n//todo高并发异步删除4.3candidate和victim的频率PK之战bynienat2022/11/30\n\n//Evicttheentrywiththelowestfrequency\n//根据节点的统计频率frequency来做比较,看看要处理掉victim还是candidate\n//admit=ture:准许candidate加入;false:淘汰candidate\n\nif(admit(candidateKey,victimKey)){\nNode<K,V>evict=victim;\nvictim=victim.getNextInAccessOrder();\n//删除victim,从而留下candidate\nevictEntry(evict,RemovalCause.SIZE,0L);\ncandidate=candidate.getNextInAccessOrder();\n}else{\nNode<K,V>evict=candidate;\ncandidate=candidate.getNextInAccessOrder();\n//删除candidate,从而留下victim\nevictEntry(evict,RemovalCause.SIZE,0L);\n}\n}\n}\n\n

上面的代码逻辑是从probation的头尾取出两个node进行比较频率,频率更小者将被remove,其中尾部元素就是上一部分从eden中淘汰出来的元素,

如果将两步逻辑合并起来讲是这样的:

在eden队列通过lru淘汰出来的”候选者“与probation队列通过lru淘汰出来的“被驱逐者“进行频率比较,失败者将被从cache中真正移除。

下面看一下它的比较逻辑admit:

//准许candidate加入:\nbooleanadmit(KcandidateKey,KvictimKey){\n//分别获取victim和candidate的统计频率\nintvictimFreq=frequencySketch().frequency(victimKey);\nintcandidateFreq=frequencySketch().frequency(candidateKey);\n//伊甸园的末位>考察区的末位\nif(candidateFreq>victimFreq){\nreturntrue;//ture:准许candidate加入,淘汰victimKey;false:淘汰candidate,留下victimKey\n}elseif(candidateFreq>=ADMIT_HASHDOS_THRESHOLD){\n//Themaximumfrequencyis15andhalvedto7afteraresettoagethehistory.Anattack\n//exploitsthatahotcandidateisrejectedinfavorofahotvictim.Thethresholdofawarm\n//candidatereducesthenumberofrandomacceptancestominimizetheimpactonthehitrate.\nintrandom=ThreadLocalRandom.current().nextInt();\nreturn((random&127)==0);\n}\n//伊甸园的末位太小false:淘汰candidate,留下victimKey\nreturnfalse;\n}\n

从frequencySketch取出候选者与被驱逐者的频率,如果候选者的频率高就淘汰被驱逐者,如果被驱逐者比候选者的频率高,并且候选者频率小于等于5则淘汰者,如果前面两个条件都不满足则随机淘汰。

probation中这个数据,如何移动到protected中去的呢?

@GuardedBy(&34;)\nvoidreorderProbation(Node<K,V>node){\nif(!accessOrderProbationDeque().contains(node)){\n//Ignorestaleaccessesforanentrythatisnolongerpresent\nreturn;\n}elseif(node.getPolicyWeight()>mainProtectedMaximum()){\nreorder(accessOrderProbationDeque(),node);\nreturn;\n}\n\n//Iftheprotectedspaceexceedsitsmaximum,theLRUitemsaredemotedtotheprobationspace.\n//Thisisdeferredtotheadaptionphaseattheendofthemaintenancecycle.\nsetMainProtectedWeightedSize(mainProtectedWeightedSize()+node.getPolicyWeight());\naccessOrderProbationDeque().remove(node);\naccessOrderProtectedDeque().offerLast(node);\nnode.makeMainProtected();\n}\n\n

当数据被访问时并且该数据在probation中,这个数据就会移动到protected中去,同时通过lru从protected中淘汰一个数据进入到probation中。

这样数据流转的逻辑全部通了:

新数据都会进入到eden中,通过lru淘汰到probation,并与probation中通过lru淘汰的数据进行使用频率pk,如果胜利了就继续留在probation中,如果失败了就会被直接淘汰,当这条数据被访问了,则移动到protected。当其它数据被访问了,则它可能会从protected中通过lru淘汰到probation中。

读写操作后都有后续的一系列“维护”操作

一般缓存对读写操作后都有后续的一系列“维护”操作,Caffeine也不例外,

这些“维护”操作都在maintenance方法,我们将要说到的淘汰策略也在里面。

maintenance过程

这方法比较重要,下面也会提到,

所以这里只先说跟“淘汰策略”有关的evictEntries和climb。

/**\n*Performsthependingmaintenanceworkandsetsthestateflagsduringprocessingtoavoid\n*excessschedulingattempts.\n*\n*排空:\n*1Thereadbuffer\n*2writebuffer\n*3.referencequeues\n*\n*Thereadbuffer,writebuffer,andreferencequeuesaredrained,\n*followedbyexpiration,andsize-basedeviction.\n*\n*@paramtaskanadditionalpendingtasktorun,or{@codenull}ifnotpresent\n*/\n@GuardedBy(&34;)\nvoidmaintenance(@NullableRunnabletask){\nsetDrainStatusRelease(PROCESSING_TO_IDLE);\n\ntry{\ndrainReadBuffer();\n\ndrainWriteBuffer();\nif(task!=null){\ntask.run();\n}\n\ndrainKeyReferences();\ndrainValueReferences();\n//过期符合条件的记录\nexpireEntries();\n//淘汰符合条件的记录\nevictEntries();\n\n//动态调整window区和protected区的大小\nclimb();\n}finally{\nif((drainStatusOpaque()!=PROCESSING_TO_IDLE)||!casDrainStatus(PROCESSING_TO_IDLE,IDLE)){\nsetDrainStatusOpaque(REQUIRED);\n}\n}\n}\n

设置状态位为PROCESSING_TO_IDLE清空读缓存清空写缓存一般只有afterWrite的情况有正在执行的task(比如添加缓存项时发现已达到最大上限,此时task就是正在进行的添加缓存的操作),如果有则执行task清空key引用和value引用队列处理过期项淘汰项调整窗口比例(climbinghill算法)尝试将状态从PROCESSING_TO_IDLE改成IDLE,否则记为REQUIRED

这里有一个小设计:BLCHeader.DrainStatusRef<K,V>包含一个volatile的drainStatus状态位+15个long的填充位。

注:lazySetDrainStatus本质调用的是unsafe的putOrderedInt方法,可以lazyset一个volatile的变量

W-TinyLFU策略的实现用到的数据结构

先说一下Caffeine对上面说到的W-TinyLFU策略的实现用到的数据结构:

//最大的个数限制\nlongmaximum;\n//当前的个数\nlongweightedSize;\n//window区的最大限制\nlongwindowMaximum;\n//window区当前的个数\nlongwindowWeightedSize;\n//protected区的最大限制\nlongmainProtectedMaximum;\n//protected区当前的个数\nlongmainProtectedWeightedSize;\n//下一次需要调整的大小(还需要进一步计算)\ndoublestepSize;\n//window区需要调整的大小\nlongadjustment;\n//命中计数\ninthitsInSample;\n//不命中的计数\nintmissesInSample;\n//上一次的缓存命中率\ndoublepreviousSampleHitRate;\n\nfinalFrequencySketch<K>sketch;\n//window区的LRUqueue(FIFO)\nfinalAccessOrderDeque<Node<K,V>>accessOrderWindowDeque;\n//probation区的LRUqueue(FIFO)\nfinalAccessOrderDeque<Node<K,V>>accessOrderProbationDeque;\n//protected区的LRUqueue(FIFO)\nfinalAccessOrderDeque<Node<K,V>>accessOrderProtectedDeque;\n

以及默认比例设置(意思看注释)

/**Theinitialpercentofthemaximumweightedcapacitydedicatedtothemainspace.*/\nstaticfinaldoublePERCENT_MAIN=0.99d;\n/**Thepercentofthemaximumweightedcapacitydedicatedtothemain&39;sprotectedspace.*/\nstaticfinaldoublePERCENT_MAIN_PROTECTED=0.80d;\n\n/**Thedifferenceinhitratesthatrestartstheclimber.*/\n\nstaticfinaldoubleHILL_CLIMBER_RESTART_THRESHOLD=0.05d;\n/**Thepercentofthetotalsizetoadaptthewindowby.*/\n\nstaticfinaldoubleHILL_CLIMBER_STEP_PERCENT=0.0625d;\n/**Theratetodecreasethestepsizetoadaptby.*/\n\nstaticfinaldoubleHILL_CLIMBER_STEP_DECAY_RATE=0.98d;\n\n/**Themaximumnumberofentriesthatcanbetransferedbetweenqueues.*/\n

重点来了,evictEntries和climb方法:

/**Evictsentriesifthecacheexceedsthemaximum.*/\n@GuardedBy(&34;)\nvoidevictEntries(){\nif(!evicts()){\nreturn;\n}\n//淘汰window区的记录\nintcandidates=evictFromWindow();\n//淘汰Main区的记录\nevictFromMain(candidates);\n}\n

evictFromWindow方法

/**\n*Evictsentriesfromthewindowspaceintothemainspacewhilethewindowsizeexceedsa\n*maximum.\n*\n*@returnthenumberofcandidateentriesevictedfromthewindowspace\n*/\n//根据W-TinyLFU,新的数据都会无条件的加到admissionwindow\n//但是window是有大小限制,所以要“定期”做一下“维护”\n@GuardedBy(&34;)\nintevictFromWindow(){\nintcandidates=0;\n//查看windowqueue的头部节点\nNode<K,V>node=accessOrderWindowDeque().peek();\n//如果window区超过了最大的限制,那么就要把“多出来”的记录做处理\nwhile(windowWeightedSize()>windowMaximum()){\n//Thependingoperationswilladjustthesizetoreflectthecorrectweight\nif(node==null){\nbreak;\n}\n//下一个节点\nNode<K,V>next=node.getNextInAccessOrder();\nif(node.getWeight()!=0){\n//把node定位在probation区\nnode.makeMainProbation();\n//从window区去掉\naccessOrderWindowDeque().remove(node);\n//加入到probationqueue,相当于把节点移动到probation区(晋升了)\naccessOrderProbationDeque().add(node);\ncandidates++;\n//因为移除了一个节点,所以需要调整window的size\nsetWindowWeightedSize(windowWeightedSize()-node.getPolicyWeight());\n}\n//处理下一个节点\nnode=next;\n}\n\nreturncandidates;\n}\n

evictFromMain方法:

//根据W-TinyLFU,从window晋升过来的要跟probation区的进行“PK”,胜者才能留下\n\n@GuardedBy(&34;)\nvoidevictFromMain(@NullableNode<K,V>candidate){\nintvictimQueue=PROBATION;\nintcandidateQueue=PROBATION;\n\n//迭代处理考察区probationqueue\n//victim是probationqueue的头部\nNode<K,V>victim=accessOrderProbationDeque().peekFirst();\nwhile(weightedSize()>maximum()){\n//Searchtheadmissionwindowforadditionalcandidates\n//candidate刚从window晋升来的,最先晋升的那个元素\nif((candidate==null)&&(candidateQueue==PROBATION)){\ncandidate=accessOrderWindowDeque().peekFirst();\ncandidateQueue=WINDOW;\n}\n\n//Tryevictingfromtheprotectedandwindowqueues\nif((candidate==null)&&(victim==null)){\nif(victimQueue==PROBATION){\nvictim=accessOrderProtectedDeque().peekFirst();\nvictimQueue=PROTECTED;\ncontinue;\n}elseif(victimQueue==PROTECTED){\nvictim=accessOrderWindowDeque().peekFirst();\nvictimQueue=WINDOW;\ncontinue;\n}\n\n//Thependingoperationswilladjustthesizetoreflectthecorrectweight\nbreak;\n}\n\n//Skipoverentrieswithzeroweight\nif((victim!=null)&&(victim.getPolicyWeight()==0)){\nvictim=victim.getNextInAccessOrder();\ncontinue;\n}elseif((candidate!=null)&&(candidate.getPolicyWeight()==0)){\ncandidate=candidate.getNextInAccessOrder();\ncontinue;\n}\n\n//Evictimmediatelyifonlyoneoftheentriesispresent\nif(victim==null){\n@SuppressWarnings(&34;)\nNode<K,V>previous=candidate.getNextInAccessOrder();\nNode<K,V>evict=candidate;\ncandidate=previous;\nevictEntry(evict,RemovalCause.SIZE,0L);\ncontinue;\n}elseif(candidate==null){\nNode<K,V>evict=victim;\nvictim=victim.getNextInAccessOrder();\nevictEntry(evict,RemovalCause.SIZE,0L);\ncontinue;\n}\n\n//Evictimmediatelyifbothselectedthesameentry\nif(candidate==victim){\nvictim=victim.getNextInAccessOrder();\nevictEntry(candidate,RemovalCause.SIZE,0L);\ncandidate=null;\ncontinue;\n}\n\n//Evictimmediatelyifanentrywascollected\nKvictimKey=victim.getKey();\nKcandidateKey=candidate.getKey();\nif(victimKey==null){\nNode<K,V>evict=victim;\nvictim=victim.getNextInAccessOrder();\nevictEntry(evict,RemovalCause.COLLECTED,0L);\ncontinue;\n}elseif(candidateKey==null){\nNode<K,V>evict=candidate;\ncandidate=candidate.getNextInAccessOrder();\nevictEntry(evict,RemovalCause.COLLECTED,0L);\ncontinue;\n}\n\n//Evictimmediatelyifanentrywasremoved\nif(!victim.isAlive()){\nNode<K,V>evict=victim;\nvictim=victim.getNextInAccessOrder();\nevictEntry(evict,RemovalCause.SIZE,0L);\ncontinue;\n}elseif(!candidate.isAlive()){\nNode<K,V>evict=candidate;\ncandidate=candidate.getNextInAccessOrder();\nevictEntry(evict,RemovalCause.SIZE,0L);\ncontinue;\n}\n\n//Evictimmediatelyifthecandidate&34;evictionLock&adjustment()}accordingly.*/\n@GuardedBy(&34;)\nvoiddetermineAdjustment(){\n//如果frequencySketch还没初始化,则返回\nif(frequencySketch().isNotInitialized()){\nsetPreviousSampleHitRate(0.0);\nsetMissesInSample(0);\nsetHitsInSample(0);\nreturn;\n}\n//总请求量=命中+miss\nintrequestCount=hitsInSample()+missesInSample();\n//没达到sampleSize则返回\n//默认下sampleSize=10*maximum。用sampleSize来判断缓存是否足够”热“。\nif(requestCount<frequencySketch().sampleSize){\nreturn;\n}\n\n//命中率的公式=命中/总请求\ndoublehitRate=(double)hitsInSample()/requestCount;\n//命中率的差值\ndoublehitRateChange=hitRate-previousSampleHitRate();\n//本次调整的大小,是由命中率的差值和上次的stepSize决定的\ndoubleamount=(hitRateChange>=0)?stepSize():-stepSize();\n//下次的调整大小:如果命中率的之差大于0.05,则重置为0.065*maximum,否则按照0.98来进行衰减\ndoublenextStepSize=(Math.abs(hitRateChange)>=HILL_CLIMBER_RESTART_THRESHOLD)\n?HILL_CLIMBER_STEP_PERCENT*maximum()*(amount>=0?1:-1)\n:HILL_CLIMBER_STEP_DECAY_RATE*amount;\nsetPreviousSampleHitRate(hitRate);\nsetAdjustment((long)amount);\nsetStepSize(nextStepSize);\nsetMissesInSample(0);\nsetHitsInSample(0);\n}\n\n/**Transfersthenodesfromtheprotectedtotheprobationregionifitexceedsthemaximum.*/\n\n//这个方法比较简单,减少protected区溢出的部分\n@GuardedBy(&34;)\nvoiddemoteFromMainProtected(){\nlongmainProtectedMaximum=mainProtectedMaximum();\nlongmainProtectedWeightedSize=mainProtectedWeightedSize();\nif(mainProtectedWeightedSize<=mainProtectedMaximum){\nreturn;\n}\n\nfor(inti=0;i<QUEUE_TRANSFER_THRESHOLD;i++){\nif(mainProtectedWeightedSize<=mainProtectedMaximum){\nbreak;\n}\n\nNode<K,V>demoted=accessOrderProtectedDeque().poll();\nif(demoted==null){\nbreak;\n}\ndemoted.makeMainProbation();\naccessOrderProbationDeque().add(demoted);\nmainProtectedWeightedSize-=demoted.getPolicyWeight();\n}\nsetMainProtectedWeightedSize(mainProtectedWeightedSize);\n}\n\n/**\n*Increasesthesizeoftheadmissionwindowbyshrinkingtheportionallocatedtothemain\n*space.Asthemainspaceispartitionedintoprobationandprotectedregions(80%/20%),for\n*simplicityonlytheprotectedisreduced.Iftheregionsexceedtheirmaximums,thismaycause\n*protecteditemstobedemotedtotheprobationregionandprobationitemstobedemotedtothe\n*admissionwindow.\n*/\n\n//增加window区的大小,这个方法比较简单,思路就像我上面说的\n@GuardedBy(&34;)\nvoidincreaseWindow(){\nif(mainProtectedMaximum()==0){\nreturn;\n}\n\nlongquota=Math.min(adjustment(),mainProtectedMaximum());\nsetMainProtectedMaximum(mainProtectedMaximum()-quota);\nsetWindowMaximum(windowMaximum()+quota);\ndemoteFromMainProtected();\n\nfor(inti=0;i<QUEUE_TRANSFER_THRESHOLD;i++){\nNode<K,V>candidate=accessOrderProbationDeque().peek();\nbooleanprobation=true;\nif((candidate==null)||(quota<candidate.getPolicyWeight())){\ncandidate=accessOrderProtectedDeque().peek();\nprobation=false;\n}\nif(candidate==null){\nbreak;\n}\n\nintweight=candidate.getPolicyWeight();\nif(quota<weight){\nbreak;\n}\n\nquota-=weight;\nif(probation){\naccessOrderProbationDeque().remove(candidate);\n}else{\nsetMainProtectedWeightedSize(mainProtectedWeightedSize()-weight);\naccessOrderProtectedDeque().remove(candidate);\n}\nsetWindowWeightedSize(windowWeightedSize()+weight);\naccessOrderWindowDeque().add(candidate);\ncandidate.makeWindow();\n}\n\nsetMainProtectedMaximum(mainProtectedMaximum()+quota);\nsetWindowMaximum(windowMaximum()-quota);\nsetAdjustment(quota);\n}\n\n/**Decreasesthesizeoftheadmissionwindowandincreasesthemain&34;evictionLock&34;evictionLock&39;slocationinthepagereplacementpolicy.*/\n@GuardedBy(&34;)\nvoidonAccess(Node<K,V>node){\nif(evicts()){\nKkey=node.getKey();\nif(key==null){\nreturn;\n}\nfrequencySketch().increment(key);\nif(node.inWindow()){\nreorder(accessOrderWindowDeque(),node);\n}elseif(node.inMainProbation()){\nreorderProbation(node);\n}else{\nreorder(accessOrderProtectedDeque(),node);\n}\nsetHitsInSample(hitsInSample()+1);\n}elseif(expiresAfterAccess()){\nreorder(accessOrderWindowDeque(),node);\n}\nif(expiresVariable()){\ntimerWheel().reschedule(node);\n}\n}\n

这个onAccess主要是:

统计tinyLFU的计数将节点在队列中重排序,以及更新统计信息。

Caffeine的性能比较

吞吐量PK

可以清楚的看到Caffeine效率明显的高于其他缓存。

Read(100%)

Inthisbenchmark8threadsconcurrentlyreadfromacacheconfiguredwithamaximumsize.

Read(75%)/Write(25%)

Inthisbenchmark6threadsconcurrentlyreadfromand2threadswritetoacacheconfiguredwithamaximumsize.

Write(100%)

Inthisbenchmark8threadsconcurrentlywritetoacacheconfiguredwithamaximumsize.

Server-class

ThebenchmarkswererunonanAzureG4instance,thelargestavailableduringafreetrialfromthemajorcloudproviders.ThemachinewasreportedasasinglesocketXeonE5-2698Bv3@2.00GHz(16core,hyperthreadingdisabled),224GB,Ubuntu15.04.

异步的高性能读写

解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:分散操作热点、使用队列削峰。

一般的缓存每次对数据处理完之后(读的话,已经存在则直接返回,不存在则load数据,保存,再返回;写的话,则直接插入或更新),但是因为要维护一些淘汰策略,则需要一些额外的操作,诸如:

计算和比较数据的是否过期统计频率(像LFU或其变种)维护readqueue和writequeue淘汰符合条件的数据等等。。。

这种数据的读写伴随着缓存状态的变更,GuavaCache的做法是把这些操作和读写操作放在一起,在一个同步加锁的操作中完成,

虽然GuavaCache巧妙地利用了JDK的ConcurrentHashMap(分段锁或者无锁CAS)来降低锁的密度,达到提高并发度的目的。

但是,对于一些热点数据,这种做法还是避免不了频繁的锁竞争。

Caffeine借鉴了数据库系统的WAL(Write-AheadLogging)思想,即:先写日志,再执行操作

这种思想同样适合缓存的,

执行读写操作时,先把操作记录在缓冲区,然后在合适的时机异步、批量地执行缓冲区中的内容。

但在执行缓冲区的内容时,也是需要在缓冲区加上同步锁的,不然存在并发问题,

只不过这样就可以把对锁的竞争从缓存数据转移到对缓冲区上。

ReadBuffer读缓冲

在Caffeine的内部实现中,为了很好的支持不同的Features(如Eviction,Removal,Refresh,Statistics,Cleanup,Policy等等),扩展了很多子类,

它们共同的父类是BoundedLocalCache,而readBuffer就是作为它们共有的属性,即都是用一样的readBuffer,

ReadBuffer读缓冲定义:

finalBuffer<Node<K,V>>readBuffer;\n

ReadBuffer读缓冲初始化:

readBuffer=evicts()||collectKeys()||collectValues()||expiresAfterAccess()\n?newBoundedBuffer<>()\n:Buffer.disabled();\n

readBuffer的类型是BoundedBuffer,它的实现是一个StripedRing(条带隔离的环形)的buffer

首先考虑到readBuffer的特点是多生产者-单消费者(MPSC),所以只需要考虑写入端的并发问题。

生产者并行(可能存在竞争)读取计数,检查是否有可用的容量,如果可用,则尝试一下CAS写入计数的操作。

如果增量成功,则生产者会懒发布这个元素。

由于CAS失败或缓冲区已满而失败时,生产方不会重试或阻塞。

消费者读取计数并获取可用元素,然后清除元素的并懒设置读取计数。

缓冲区分成很多条带(这就是Striped的含义)。

如果检测到竞争,则重新哈希、并动态添加新缓冲区来进一步提高并发性,直到一个内部最大值。

当重新哈希发现了可用的缓冲区时,生产者可以重试添加元素以确定是否找到了满足的缓冲区,或者是否有必要调整大小。

具体代码不再列举,一些关键参数:

每条ringbuffer允许16个元素(每个元素一个4字节的引用)最大允许4*ceilingNextPowerOfTwo(CPU数)条ringbuffer

ceilingNextPowerOfTwo表示向上取2的整数幂

触发afterRead

Caffeine对每次缓存的读操作都会触发afterRead

/**\n*Performsthepost-processingworkrequiredafteraread.\n*\n*@paramnodetheentryinthepagereplacementpolicy\n*@paramnowthecurrenttime,innanoseconds\n*@paramrecordHitifthehitcountshouldbeincremented\n*/\nvoidafterRead(Node<K,V>node,longnow,booleanrecordHit){\nif(recordHit){\nstatsCounter().recordHits(1);\n}\n//把记录加入到readBuffer\n//判断是否需要立即处理readBuffer\n//注意这里无论offer是否成功都可以走下去的,即允许写入readBuffer丢失,因为这个\n\nbooleandelayable=skipReadBuffer()||(readBuffer.offer(node)!=Buffer.FULL);\nif(shouldDrainBuffers(delayable)){\nscheduleDrainBuffers();\n}\nrefreshIfNeeded(node,now);\n}\n\n/**\n*Returnswhethermaintenanceworkisneeded.\n*\n*@paramdelayableifdrainingthereadbuffercanbedelayed\n*/\n\n//caffeine用了一组状态来定义和管理“维护”的过程\nbooleanshouldDrainBuffers(booleandelayable){\nswitch(drainStatus()){\ncaseIDLE:\nreturn!delayable;\ncaseREQUIRED:\nreturntrue;\ncasePROCESSING_TO_IDLE:\ncasePROCESSING_TO_REQUIRED:\nreturnfalse;\ndefault:\nthrownewIllegalStateException();\n}\n}\n\n

重点看BoundedBuffer

/**\n*Astriped,non-blocking,boundedbuffer.\n*\n*@authorben.manes@gmail.com(BenManes)\n*@param<E>thetypeofelementsmaintainedbythisbuffer\n*/\nfinalclassBoundedBuffer<E>extendsStripedBuffer<E>\n\n

它是一个striped、非阻塞、有界限的buffer,继承于StripedBuffer类。

下面看看StripedBuffer的实现:

/**\n*Abaseclassprovidingthemechanicsforsupportingdynamicstripingofboundedbuffers.This\n*implementationisanadaptionofthenumeric64-bit{@linkjava.util.concurrent.atomic.Striped64}\n*class,whichisusedbyatomiccounters.Theapproachwasmodifiedtolazilygrowanarrayof\n*buffersinordertominimizememoryusageforcachesthatarenotheavilycontendedon.\n*\n*@authordl@cs.oswego.edu(DougLea)\n*@authorben.manes@gmail.com(BenManes)\n*/\n\nabstractclassStripedBuffer<E>implementsBuffer<E>\n\n

分散操作热点

解决CAS恶性空自旋的有效方式之一是以空间换时间,

较为常见的方案有两种:

分散操作热点、使用队列削峰。

这个StripedBuffer设计的思想是跟Striped64类似的,通过扩展结构把**分散操作热点(/竞争热点分离)**。

具体实现是这样的,StripedBuffer维护一个Buffer[]数组,每个元素就是一个RingBuffer,

每个线程用自己threadLocalRandomProbe属性作为hash值,这样就相当于每个线程都有自己“专属”的RingBuffer,就不会产生竞争啦,

而不是用key的hashCode作为hash值,因为会产生热点数据问题。

看看StripedBuffer的属性

/**Tableofbuffers.Whennon-null,sizeisapowerof2.*/\n//RingBuffer数组\ntransientvolatileBuffer<E>@Nullable[]table;\n\n//当进行resize时,需要整个table锁住。tableBusy作为CAS的标记。\nstaticfinallongTABLE_BUSY=UnsafeAccess.objectFieldOffset(StripedBuffer.class,&34;);\nstaticfinallongPROBE=UnsafeAccess.objectFieldOffset(Thread.class,&34;);\n\n/**NumberofCPUS.*/\nstaticfinalintNCPU=Runtime.getRuntime().availableProcessors();\n\n/**Theboundonthetablesize.*/\n//table最大size\nstaticfinalintMAXIMUM_TABLE_SIZE=4*ceilingNextPowerOfTwo(NCPU);\n\n/**Themaximumnumberofattemptswhentryingtoexpandthetable.*/\n//如果发生竞争时(CAS失败)的尝试次数\nstaticfinalintATTEMPTS=3;\n\n/**Tableofbuffers.Whennon-null,sizeisapowerof2.*/\n//核心数据结构\ntransientvolatileBuffer<E>@Nullable[]table;\n\n/**Spinlock(lockedviaCAS)usedwhenresizingand/orcreatingBuffers.*/\ntransientvolatileinttableBusy;\n\n/**CASesthetableBusyfieldfrom0to1toacquirelock.*/\nfinalbooleancasTableBusy(){\nreturnUnsafeAccess.UNSAFE.compareAndSwapInt(this,TABLE_BUSY,0,1);\n}\n\n/**\n*Returnstheprobevalueforthecurrentthread.DuplicatedfromThreadLocalRandombecauseof\n*packagingrestrictions.\n*/\nstaticfinalintgetProbe(){\nreturnUnsafeAccess.UNSAFE.getInt(Thread.currentThread(),PROBE);\n}\n/**Tableofbuffers.Whennon-null,sizeisapowerof2.*///RingBuffer数组transientvolatileBuffer<E>@Nullable[]table;//当进行resize时,需要整个table锁住。tableBusy作为CAS的标记。staticfinallongTABLE_BUSY=UnsafeAccess.objectFieldOffset(StripedBuffer.class,&34;);staticfinallongPROBE=UnsafeAccess.objectFieldOffset(Thread.class,&34;);/**NumberofCPUS.*/staticfinalintNCPU=Runtime.getRuntime().availableProcessors();/**Theboundonthetablesize.*///table最大sizestaticfinalintMAXIMUM_TABLE_SIZE=4*ceilingNextPowerOfTwo(NCPU);/**Themaximumnumberofattemptswhentryingtoexpandthetable.*///如果发生竞争时(CAS失败)的尝试次数staticfinalintATTEMPTS=3;/**Tableofbuffers.Whennon-null,sizeisapowerof2.*///核心数据结构transientvolatileBuffer<E>@Nullable[]table;/**Spinlock(lockedviaCAS)usedwhenresizingand/orcreatingBuffers.*/transientvolatileinttableBusy;/**CASesthetableBusyfieldfrom0to1toacquirelock.*/finalbooleancasTableBusy(){returnUnsafeAccess.UNSAFE.compareAndSwapInt(this,TABLE_BUSY,0,1);}/***Returnstheprobevalueforthecurrentthread.DuplicatedfromThreadLocalRandombecauseof*packagingrestrictions.*/staticfinalintgetProbe(){returnUnsafeAccess.UNSAFE.getInt(Thread.currentThread(),PROBE);}\n

offer方法,当没初始化或存在竞争时,则扩容为2倍。

实际是调用RingBuffer的offer方法,把数据追加到RingBuffer后面。

@Override\npublicintoffer(Ee){\nintmask;\nintresult=0;\nBuffer<E>buffer;\n//是否不存在竞争\nbooleanuncontended=true;\nBuffer<E>[]buffers=table\n//是否已经初始化\nif((buffers==null)\n||(mask=buffers.length-1)<0\n//用thread的随机值作为hash值,得到对应位置的RingBuffer\n||(buffer=buffers[getProbe()&mask])==null\n//检查追加到RingBuffer是否成功\n||!(uncontended=((result=buffer.offer(e))!=Buffer.FAILED))){\n//其中一个符合条件则进行扩容\nexpandOrRetry(e,uncontended);\n}\nreturnresult;\n}\n\n/**\n*Handlescasesofupdatesinvolvinginitialization,resizing,creatingnewBuffers,and/or\n*contention.Seeaboveforexplanation.Thismethodsufferstheusualnon-modularityproblemsof\n*optimisticretrycode,relyingonrecheckedsetsofreads.\n*\n*@parametheelementtoadd\n*@paramwasUncontendedfalseifCASfailedbeforecall\n*/\n\n//这个方法比较长,但思路还是相对清晰的。\n@SuppressWarnings(&34;)\nfinalvoidexpandOrRetry(Ee,booleanwasUncontended){\ninth;\nif((h=getProbe())==0){\nThreadLocalRandom.current();//forceinitialization\nh=getProbe();\nwasUncontended=true;\n}\nbooleancollide=false;//Trueiflastslotnonempty\nfor(intattempt=0;attempt<ATTEMPTS;attempt++){\nBuffer<E>[]buffers;\nBuffer<E>buffer;\nintn;\nif(((buffers=table)!=null)&&((n=buffers.length)>0)){\nif((buffer=buffers[(n-1)&h])==null){\nif((tableBusy==0)&&casTableBusy()){//TrytoattachnewBuffer\nbooleancreated=false;\ntry{//Recheckunderlock\nBuffer<E>[]rs;\nintmask,j;\nif(((rs=table)!=null)&&((mask=rs.length)>0)\n&&(rs[j=(mask-1)&h]==null)){\nrs[j]=create(e);\ncreated=true;\n}\n}finally{\ntableBusy=0;\n}\nif(created){\nbreak;\n}\ncontinue;//Slotisnownon-empty\n}\ncollide=false;\n}elseif(!wasUncontended){//CASalreadyknowntofail\nwasUncontended=true;//Continueafterrehash\n}elseif(buffer.offer(e)!=Buffer.FAILED){\nbreak;\n}elseif(n>=MAXIMUM_TABLE_SIZE||table!=buffers){\ncollide=false;//Atmaxsizeorstale\n}elseif(!collide){\ncollide=true;\n}elseif(tableBusy==0&&casTableBusy()){\ntry{\nif(table==buffers){//Expandtableunlessstale\ntable=Arrays.copyOf(buffers,n<<1);\n}\n}finally{\ntableBusy=0;\n}\ncollide=false;\ncontinue;//Retrywithexpandedtable\n}\nh=advanceProbe(h);\n}elseif((tableBusy==0)&&(table==buffers)&&casTableBusy()){\nbooleaninit=false;\ntry{//Initializetable\nif(table==buffers){\n@SuppressWarnings({&34;,&34;})\nBuffer<E>[]rs=newBuffer[1];\nrs[0]=create(e);\ntable=rs;\ninit=true;\n}\n}finally{\ntableBusy=0;\n}\nif(init){\nbreak;\n}\n}\n}\n}\n\n

最后看看RingBuffer,注意RingBuffer是BoundedBuffer的内部类。

/**Themaximumnumberofelementsperbuffer.*/\nstaticfinalintBUFFER_SIZE=16;\n\n//Assume4-bytereferencesand64-bytecacheline(16elementsperline)\n//256长度,但是是以16为单位,所以最多存放16个元素\nstaticfinalintSPACED_SIZE=BUFFER_SIZE<<4;\nstaticfinalintSPACED_MASK=SPACED_SIZE-1;\nstaticfinalintOFFSET=16;\n//RingBuffer数组\nfinalAtomicReferenceArray<E>buffer;\n\n//插入方法\n@Override\npublicintoffer(Ee){\nlonghead=readCounter;\nlongtail=relaxedWriteCounter();\n//用head和tail来限制个数\nlongsize=(tail-head);\nif(size>=SPACED_SIZE){\nreturnBuffer.FULL;\n}\n//tail追加16\nif(casWriteCounter(tail,tail+OFFSET)){\n//用tail“取余”得到下标\nintindex=(int)(tail&SPACED_MASK);\n//用unsafe.putOrderedObject设值\nbuffer.lazySet(index,e);\nreturnBuffer.SUCCESS;\n}\n//如果CAS失败则返回失败\nreturnBuffer.FAILED;\n}\n\n//用consumer来处理buffer的数据\n@Override\npublicvoiddrainTo(Consumer<E>consumer){\nlonghead=readCounter;\nlongtail=relaxedWriteCounter();\n//判断数据多少\nlongsize=(tail-head);\nif(size==0){\nreturn;\n}\ndo{\nintindex=(int)(head&SPACED_MASK);\nEe=buffer.get(index);\nif(e==null){\n//notpublishedyet\nbreak;\n}\nbuffer.lazySet(index,null);\nconsumer.accept(e);\n//head也跟tail一样,每次递增16\nhead+=OFFSET;\n}while(head!=tail);\nlazySetReadCounter(head);\n}\n\n

注意,ringbuffer的size(固定是16个)是不变的,变的是head和tail而已。

总的来说ReadBuffer有如下特点:

使用Striped-RingBuffer来提升对buffer的读写用thread的hash来避开热点key的竞争允许写入的丢失

WriteBuffer

本来缓存的一般场景是读多写少的,读的并发会更高,且afterRead显得没那么重要,允许延迟甚至丢失。

writeBuffer跟readBuffer不一样,主要体现在使用场景的不一样。

写不一样,写afterWrite不允许丢失,且要求尽量马上执行

Caffeine使用MPSC(MultipleProducer/SingleConsumer)作为buffer数组,

实现在MpscGrowableArrayQueue类,它是仿照JCTools的MpscGrowableArrayQueue来写的。

MPSC允许无锁的高并发写入,但只允许一个消费者,同时也牺牲了部分操作。

TimerWheel

除了支持expireAfterAccess和expireAfterWrite之外(GuavaCache也支持这两个特性),Caffeine还支持expireAfter。

因为expireAfterAccess和expireAfterWrite都只能是固定的过期时间,这可能满足不了某些场景,譬如记录的过期时间是需要根据某些条件而不一样的,这就需要用户自定义过期时间。

先看看expireAfter的用法

packagecom.github.benmanes.caffeine.demo;\n\nimportcom.github.benmanes.caffeine.cache.Cache;\nimportcom.github.benmanes.caffeine.cache.CacheLoader;\nimportcom.github.benmanes.caffeine.cache.Caffeine;\nimportcom.github.benmanes.caffeine.cache.Expiry;\nimportorg.checkerframework.checker.index.qual.NonNegative;\nimportorg.checkerframework.checker.nullness.qual.NonNull;\nimportorg.checkerframework.checker.nullness.qual.Nullable;\n\nimportjava.util.concurrent.TimeUnit;\n\npublicclassExpireAfterDemo{\nstaticSystem.Loggerlogger=System.getLogger(ExpireAfterDemo.class.getName());\n\npublicstaticvoidhello(String[]args){\nSystem.out.println(&34;+args);\n}\n\n\npublicstaticvoidmain(String…args)throwsException{\nCache<String,String>cache=Caffeine.newBuilder()\n//最大个数限制\n//最大容量1024个,超过会自动清理空间\n.maximumSize(1024)\n//初始化容量\n.initialCapacity(1)\n//访问后过期(包括读和写)\n//5秒没有读写自动删除\n//.expireAfterAccess(5,TimeUnit.SECONDS)\n//写后过期\n//.expireAfterWrite(2,TimeUnit.HOURS)\n//写后自动异步刷新\n//.refreshAfterWrite(1,TimeUnit.HOURS)\n//记录下缓存的一些统计数据,例如命中率等\n.recordStats()\n.removalListener(((key,value,cause)->{\n//清理通知key,value==>键值对cause==>清理原因\nSystem.out.println(&34;+key);\n}))\n.expireAfter(newExpiry<String,String>(){\n//返回创建后的过期时间\n@Override\npubliclongexpireAfterCreate(@NonNullStringkey,@NonNullStringvalue,longcurrentTime){\nSystem.out.println(&34;+key);\nreturn0;\n}\n\n//返回更新后的过期时间\n@Override\npubliclongexpireAfterUpdate(@NonNullStringkey,@NonNullStringvalue,longcurrentTime,@NonNegativelongcurrentDuration){\nSystem.out.println(&34;+key);\nreturn0;\n}\n\n//返回读取后的过期时间\n@Override\npubliclongexpireAfterRead(@NonNullStringkey,@NonNullStringvalue,longcurrentTime,@NonNegativelongcurrentDuration){\nSystem.out.println(&34;+key);\nreturn0;\n}\n})\n.recordStats()\n//使用CacheLoader创建一个LoadingCache\n.build(newCacheLoader<String,String>(){\n//同步加载数据\n@Nullable\n@Override\npublicStringload(@NonNullStringkey)throwsException{\nSystem.out.println(&34;+key);\nreturn&34;+key;\n}\n\n//异步加载数据\n@Nullable\n@Override\npublicStringreload(@NonNullStringkey,@NonNullStringoldValue)throwsException{\nSystem.out.println(&34;+key);\nreturn&34;+key;\n}\n});\n\n//添加值\ncache.put(&34;,&34;);\ncache.put(&34;,&34;);\n\n//获取值\n@NullableStringvalue=cache.getIfPresent(&34;);\nSystem.out.println(&34;+value);\n//remove\ncache.invalidate(&34;);\nvalue=cache.getIfPresent(&34;);\nSystem.out.println(&34;+value);\n}\n\n}\n\n

通过自定义过期时间,使得不同的key可以动态的得到不同的过期时间。

注意,我把expireAfterAccess和expireAfterWrite注释了,因为这两个特性不能跟expireAfter一起使用。

而当使用了expireAfter特性后,Caffeine会启用一种叫“时间轮”的算法来实现这个功能。

为什么要用时间轮

好,重点来了,为什么要用时间轮?

对expireAfterAccess和expireAfterWrite的实现是用一个AccessOrderDeque双端队列,它是FIFO的

因为它们的过期时间是固定的,所以在队列头的数据肯定是最早过期的,要处理过期数据时,只需要首先看看头部是否过期,然后再挨个检查就可以了。

但是,如果过期时间不一样的话,这需要对accessOrderQueue进行排序&插入,这个代价太大了。

于是,Caffeine用了一种更加高效、优雅的算法-时间轮。

时间轮的结构:

Caffeine对时间轮的实现在TimerWheel,它是一种多层时间轮(hierarchicaltimingwheels)。

看看元素加入到时间轮的schedule方法:

/**\n*Schedulesatimereventforthenode.\n*\n*@paramnodetheentryinthecache\n*/\npublicvoidschedule(@NonNullNode<K,V>node){\nNode<K,V>sentinel=findBucket(node.getVariableTime());\nlink(sentinel,node);\n}\n\n/**\n*Determinesthebucketthatthetimereventshouldbeaddedto.\n*\n*@paramtimethetimewhentheeventfires\n*@returnthesentinelattheheadofthebucket\n*/\nNode<K,V>findBucket(longtime){\nlongduration=time-nanos;\nintlength=wheel.length-1;\nfor(inti=0;i<length;i++){\nif(duration<SPANS[i+1]){\nlongticks=(time>>>SHIFT[i]);\nintindex=(int)(ticks&(wheel[i].length-1));\nreturnwheel[i][index];\n}\n}\nreturnwheel[length][0];\n}\n\n/**Addstheentryatthetailofthebucket&34;plainStr&34;staticStr&34;reflectStr&39;svariabletypeis{@codetype}andithasone\n*coordinatetype,{@coderecv}.\n*<p>\n*Accesscheckingisperformedimmediatelyonbehalfofthelookup\n*class.\n*<p>\n*CertainaccessmodesofthereturnedVarHandleareunsupportedunder\n*thefollowingconditions:\n*<ul>\n*<li>ifthefieldisdeclared{@codefinal},thenthewrite,atomic\n*update,numericatomicupdate,andbitwiseatomicupdateaccess\n*modesareunsupported.\n*<li>ifthefieldtypeisanythingotherthan{@codebyte},\n*{@codeshort},{@codechar},{@codeint},{@codelong},\n*{@codefloat},or{@codedouble}thennumericatomicupdate\n*accessmodesareunsupported.\n*<li>ifthefieldtypeisanythingotherthan{@codeboolean},\n*{@codebyte},{@codeshort},{@codechar},{@codeint}or\n*{@codelong}thenbitwiseatomicupdateaccessmodesare\n*unsupported.\n*</ul>\n*<p>\n*Ifthefieldisdeclared{@codevolatile}thenthereturnedVarHandle\n*willoverrideaccesstothefield(effectivelyignorethe\n*{@codevolatile}declaration)inaccordancetoitsspecified\n*accessmodes.\n*<p>\n*Ifthefieldtypeis{@codefloat}or{@codedouble}thennumeric\n*andatomicupdateaccessmodescomparevaluesusingtheirbitwise\n*representation(see{@linkFloatdoubleToRawLongBits},respectively).\n*@apiNote\n*Bitwisecomparisonof{@codefloat}valuesor{@codedouble}values,\n*asperformedbythenumericandatomicupdateaccessmodes,differ\n*fromtheprimitive{@code==}operatorandthe{@linkFloatequals}methods,specificallywithrespectto\n*comparingNaNvaluesorcomparing{@code-0.0}with{@code+0.0}.\n*Careshouldbetakenwhenperformingacompareandsetoracompare\n*andexchangeoperationwithsuchvaluessincetheoperationmay\n*unexpectedlyfail.\n*TherearemanypossibleNaNvaluesthatareconsideredtobe\n*{@codeNaN}inJava,althoughnoIEEE754floating-pointoperation\n*providedbyJavacandistinguishbetweenthem.Operationfailurecan\n*occuriftheexpectedorwitnessvalueisaNaNvalueanditis\n*transformed(perhapsinaplatformspecificmanner)intoanotherNaN\n*value,andthushasadifferentbitwiserepresentation(see\n*{@linkFloatlongBitsToDouble}formore\n*details).\n*Thevalues{@code-0.0}and{@code+0.0}havedifferentbitwise\n*representationsbutareconsideredequalwhenusingtheprimitive\n*{@code==}operator.Operationfailurecanoccurif,forexample,a\n*numericalgorithmcomputesanexpectedvaluetobesay{@code-0.0}\n*andpreviouslycomputedthewitnessvaluetobesay{@code+0.0}.\n*@paramrecvthereceiverclass,oftype{@codeR},thatdeclaresthe\n*non-staticfield\n*@paramnamethefield&39;stype,oftype{@codeT}\n*@returnaVarHandlegivingaccesstonon-staticfields.\n*@throwsNoSuchFieldExceptionifthefielddoesnotexist\n*@throwsIllegalAccessExceptionifaccesscheckingfails,orifthefieldis{@codestatic}\n*@exceptionSecurityExceptionifasecuritymanagerispresentandit\n*<ahref=&secmgr&34;Iamaplainstring&34;Iamastaticstring&34;Iamastringcreatedbyreflection&34;Iamastringarrayelement&34;Iamvolatilestring&34;Iamanewcasstring&getVarHandle.get}\n*/\nGET(&34;,AccessType.GET),\n/**\n*Theaccessmodewhoseaccessisspecifiedbythecorresponding\n*method\n*{@linkVarHandle34;set&getVolatileVarHandle.getVolatile}\n*/\nGET_VOLATILE(&34;,AccessType.GET),\n/**\n*Theaccessmodewhoseaccessisspecifiedbythecorresponding\n*method\n*{@linkVarHandle34;setVolatile&getAcquireVarHandle.getAcquire}\n*/\n\n

不同的访问模式对于指令重排的效果都可能不一样,因此需要慎重选择访问模式。

其次,在创建VarHandle的过程中,VarHandle内部的访问模式会覆盖变量声明时的任何指令排序效果。

比如说一个变量被声明为volatile类型,但是调用varHandle.get()方法时,其访问模式就是get模式,即简单读取方法,因此需要多加注意。

参考文献

疯狂创客圈JAVA高并发总目录ThreadLocal(史上最全)https://www.cnblogs.com/crazymakercircle/p/14491965.html3000页《尼恩Java面试宝典》的35个面试专题:https://www.cnblogs.com/crazymakercircle/p/13917138.html价值10W的架构师知识图谱https://www.processon.com/view/link/60fb9421637689719d246739

4、尼恩架构师哲学https://www.processon.com/view/link/616f801963768961e9d9aec8

5、尼恩3高架构知识宇宙https://www.processon.com/view/link/635097d2e0b34d40be778ab4

GuavaCache主页:https://github.com/google/guava/wiki/CachesExplained

Caffeine的官网:https://github.com/ben-manes/caffeine/wiki/Benchmarks

https://gitee.com/jd-platform-opensource/hotkey

https://developer.aliyun.com/article/788271?utm_content=m_1000291945

https://b.alipay.com/page/account-manage-oc/approval/setList

Caffeine:https://github.com/ben-manes/caffeine

这里:https://albenw.github.io/posts/df42dc84/

Benchmarks:https://github.com/ben-manes/caffeine/wiki/Benchmarks

官方API说明文档:https://github.com/ben-manes/caffeine/wiki

这里:https://github.com/ben-manes/caffeine/wiki/Guava

HashedWheelTimer时间轮原理分析:https://albenw.github.io/posts/ec8df8c/

TinyLFU论文:https://arxiv.org/abs/1512.00727

DesignOfAModernCache:http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html

DesignOfAModernCache—PartDeux:http://highscalability.com/blog/2019/2/25/design-of-a-modern-cachepart-deux.html

Caffeine的github:https://github.com/ben-manes/caffeine

关于疯狂的解析网站源码分享的内容到此结束,希望对大家有所帮助。

Published by

风君子

独自遨游何稽首 揭天掀地慰生平