服务治理:限流
定义
限流是指在单位时间内限制接口的请求数量。其主要作用有两个:
- 保护服务的处理能力:避免因流量洪峰导致系统超载或崩溃。
- 防范恶意调用:防止人为或恶意的频繁请求耗尽系统资源。
限流位置
客户端:在客户端或前端直接限制用户的请求频率。例如,用户提交表单时设置按钮的防重复点击,或通过前端代码限制调用频次。这样可以有效降低无意义的请求传递到后端。
服务端:服务端限流是在后端对请求流量进行控制,通常分为以下两种方式:
网关层:通过 API 网关对流量进行统一管理和控制,集中式处理限流规则。网关的优势在于它可以集中管理所有微服务的流量,降低对服务本身的侵入性。
服务自集成:将限流逻辑集成到服务本身,每个服务根据自身的特性独立实现限流规则。这种方式灵活,但增加了服务的复杂性。
限流对象
- 用户:针对单个用户的请求数量进行限制,防止其频繁调用接口。例如,某个用户每分钟最多只能发送 10 个请求。
- IP 地址:限制来自单个 IP 地址的请求数量,防止某些 IP 通过批量调用接口引发资源占用。例如,限制每个 IP 每秒最多发起 100 个请求。
- 全局流量:针对所有请求的总量进行限制,以保护系统整体的稳定性。比如,限制系统每秒处理的总请求数为 10,000 个。
限流算法的实现
固定窗口算法
将时间划分为固定的窗口(如 1 秒或 1 分钟),在每个时间窗口内记录请求的数量。一旦达到限流阈值,后续请求会被直接拒绝。此算法实现简单,但在窗口切换时可能出现短时间内的流量激增问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main
import (
"sync"
"time"
)
type FixedWindowLimiter struct {
mu sync.Mutex
limit int
interval time.Duration
requests int
windowStart time.Time
}
func NewFixedWindowLimiter(limit int, interval time.Duration) *FixedWindowLimiter {
return &FixedWindowLimiter{
limit: limit,
interval: interval,
windowStart: time.Now(),
}
}
func (fw *FixedWindowLimiter) Allow() bool {
fw.mu.Lock()
defer fw.mu.Unlock()
now := time.Now()
if now.Sub(fw.windowStart) >= fw.interval {
// Reset the window
fw.requests = 0
fw.windowStart = now
}
if fw.requests < fw.limit {
fw.requests++
return true
}
return false
}
滑动窗口算法
滑动窗口通过更精细的时间粒度来记录请求数量。它按照滑动的方式统计最近时间段内的请求总数,可以更平滑地控制流量并避免固定窗口算法中的流量激增。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main
import (
"sync"
"time"
)
type SlidingWindowLimiter struct {
mu sync.Mutex
limit int // 请求限制
interval time.Duration // 时间窗口大小
buckets map[int64]int // 时间桶,key 为时间段,value 为请求数
}
func NewSlidingWindowLimiter(limit int, interval time.Duration) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
limit: limit,
interval: interval,
buckets: make(map[int64]int),
}
}
func (sw *SlidingWindowLimiter) cleanUp(now int64) {
// 删除超过窗口的旧桶
for timestamp := range sw.buckets {
if timestamp < now-int64(sw.interval.Seconds()) {
delete(sw.buckets, timestamp)
}
}
}
func (sw *SlidingWindowLimiter) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now().Unix() // 当前秒时间戳
sw.cleanUp(now) // 清理过期的时间桶
// 统计窗口内的请求总数
var count int
for _, reqCount := range sw.buckets {
count += reqCount
}
if count < sw.limit {
// 当前请求被允许,计入当前时间段
sw.buckets[now]++
return true
}
return false
}
漏桶算法
漏桶算法将请求放入一个固定大小的漏桶中,以恒定速率处理请求。当请求到达速率大于漏桶的漏水速率时,多余的请求会被丢弃。这种算法能够平滑流量,但不允许突发流量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main
import (
"sync"
"time"
)
type LeakyBucketLimiter struct {
mu sync.Mutex
limit int
interval time.Duration
lastLeak time.Time
currentWater int
}
func NewLeakyBucketLimiter(limit int, interval time.Duration) *LeakyBucketLimiter {
return &LeakyBucketLimiter{
limit: limit,
interval: interval,
lastLeak: time.Now(),
}
}
func (lb *LeakyBucketLimiter) Allow() bool {
lb.mu.Lock()
defer lb.mu.Unlock()
now := time.Now()
// Leak water
leakAmount := int(now.Sub(lb.lastLeak) / lb.interval)
if leakAmount > 0 {
lb.currentWater -= leakAmount
if lb.currentWater < 0 {
lb.currentWater = 0
}
lb.lastLeak = now
}
if lb.currentWater < lb.limit {
lb.currentWater++
return true
}
return false
}
令牌桶算法
令牌桶算法按照一定的速率生成令牌,请求必须获得令牌才能被处理。令牌桶允许短时间的突发流量,只要桶内有足够的令牌。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"sync"
"time"
)
type TokenBucketLimiter struct {
mu sync.Mutex
capacity int
tokenRate int
currentTokens int
lastRefill time.Time
}
func NewTokenBucketLimiter(capacity int, tokenRate int) *TokenBucketLimiter {
return &TokenBucketLimiter{
capacity: capacity,
tokenRate: tokenRate,
currentTokens: capacity,
lastRefill: time.Now(),
}
}
func (tb *TokenBucketLimiter) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
// Add new tokens
newTokens := int(elapsed * float64(tb.tokenRate))
tb.currentTokens += newTokens
if tb.currentTokens > tb.capacity {
tb.currentTokens = tb.capacity
}
tb.lastRefill = now
if tb.currentTokens > 0 {
tb.currentTokens--
return true
}
return false
}
总结
限流是保障系统稳定性和资源公平使用的重要手段。通过选择合适的限流位置(客户端或服务端)、对象(用户、IP 或全局流量)以及算法(固定窗口、滑动窗口、漏桶、令牌桶),可以在不同场景下满足系统的需求。同时,在限流的实现过程中,还需结合业务特性,综合考虑流量模式、性能开销和用户体验。
本文由作者按照 CC BY 4.0 进行授权