目录
滑动窗口算法
Sentinel
数据模型
示例
大致流程
entry
entryWithPriority
FlowSlot.entry
checkFlow
canPass
avgUsedTokens
passQps
pass
currentWindow
calculateTimeIdx
calculateWindowStart
values
滑动窗口算法
滑动窗口算法是将时间周期分为n个小周期,分别记录每个小周期内的访问次数,并且根据时间滑动删除过期的小周期。
如下图,假设时间周期为1min,将1min再分割成2个小周期,统计每个小周期的访问数量,则可以看到,第一个时间周期内访问数量为75,第二个时间周期内访问数量为100,超过100的数量被限流掉了。
由此可见,当滑动窗口格子划分得越多,那么滑动窗口的滚动就越平滑,限流的统计就越精确。可以很好的解决固定窗口的流动问题。
Sentinel
滑动窗口算法也是Sentinel的默认算法。
数据模型
英文名称 | 中文名称 | 备注 |
array | 窗口数组 | |
windowLengthInMs | 单个窗口时间长度 | |
sampleCount | 总窗口数量 | |
intervalInMs | 时间窗口总长度 | sampleCount * windowLengthInMs |
示例
|
大致流程
点击示例里的entry()方法,如下所示:
entry
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}
点击entryWithPriority()方法,如下所示:
entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
.. ...
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
点击chain.entry()方法,因为我们这次探究的是限流算法,所以选择FlowSlot类,如下所示:
FlowSlot.entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
点击checkFlow()方法,如下所示:
checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
... ...
return false;
}
return true;
}
在这里,获取已经使用的token数量,加上待申请的数量,如果超过流控规则里设置的最大值,则返回false。
点击avgUsedTokens()方法,如下所示:
avgUsedTokens
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
在这里,通过调用node.passQps()获取已经使用的token数量。
点击passQps(),如下所示:
passQps
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
在这里,rollingCounterInSecond对象保存了时间窗口对象的数组。pass()方法可以获取每个有效的时间窗口对象里已经使用的令牌数量,getWindowIntervalInSec()方法是时间窗口总长度,以秒为单位。两者相除就可以已使用的QPS。
点击pass()方法,如下所示:
pass
public long pass() {
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
在这里,会调用currentWindow()刷新当前窗口信息,然后累加每个窗口的计数值作为当前计数周期的计数值。
点击currentWindow()方法,如下所示:
currentWindow
public WindowWrap<T> currentWindow(long timeMillis) {
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 创建新窗口
WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket());
if (array.compareAndSet(idx, null, window)) {
return window;
}
} else if (windowStart == old.windowStart()) {
// 命中当前有效窗口
return old;
} else if (windowStart > old.windowStart()) {
// 窗口已过期,重置并复用
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
}
}
}
}
在这里:
- 获取当前时间窗口的索引
- 获取当前窗口的起始时间
- 根据当前时间窗口的索引,获取时间窗口对象,如果时间窗口对象为空,则创建一个新时间窗口对象;如果已经存在时间窗口对象,则返回该对象;如果时间窗口对象已过期,则重置并复用。
calculateTimeIdx
public int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
在这里,根据当前时间戳计算对应窗口索引。
calculateWindowStart
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
在这里,计算当前窗口的起始时间(对齐到窗口边界)。
点击步骤pass的values()方法,如下所示:
values
public List<T> values() {
return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
在这里,循环时间窗口数组,忽略已经失效的时间窗口对象,将有效的时间窗口对象保存在一个列表对象里,并作为方法返回值进行返回。
isWindowDeprecated
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
在这里,将当前时间减去指定时间窗口对象的起始时间,如果结果大于计数周期时长,则表明指定的时间窗口对象已经失效。