Koa.js 限流与流量控制策略实现
为什么需要限流
在生产环境中,API 接口面临着各种流量风险:恶意刷接口、DDoS 攻击、突发流量冲击等。限流(Rate Limiting)是保护服务稳定性的重要手段。本文介绍 Koa.js 中限流策略的实现方案,包括计数器限流、令牌桶算法、滑动窗口等常见方案。
限流算法对比
| 算法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 固定窗口 | 实现简单 | 边界流量突发 | 简单场景 |
| 滑动窗口 | 平滑限流 | 实现稍复杂 | API 限流 |
| 令牌桶 | 允许突发流量 | 需要额外存储 | 流量削峰 |
| 漏桶 | 流量平滑 | 不支持突发 | 消息队列 |
基于 Redis 的滑动窗口限流
使用 Redis 实现分布式限流,支持集群部署:
// 滑动窗口限流中间件
const Redis = require('ioredis');
const redis = new Redis();
const WINDOW_SIZE = 60; // 窗口大小(秒)
const MAX_REQUESTS = 100; // 窗口内最大请求数
// 滑动窗口限流函数
async function slidingWindowRateLimit(key, windowSize, maxRequests) {
const now = Date.now();
const windowStart = now - windowSize * 1000;
// 使用 Redis 事务
const multi = redis.multi();
// 1. 移除窗口外的请求记录
multi.zremrangebyscore(key, 0, windowStart);
// 2. 获取当前窗口内的请求数
multi.zcard(key);
// 3. 如果未超限,添加新请求
const results = await multi.exec();
// results[0] 是 zremrangebyscore 的结果
// results[1] 是 zcard 的结果(当前请求数)
const currentCount = results[1][1];
if (currentCount >= maxRequests) {
// 获取最早请求的时间,计算等待时间
const earliest = await redis.zrange(key, 0, 0, 'WITHSCORES');
const waitTime = earliest.length > 1
? Math.ceil((parseInt(earliest[1]) + windowSize * 1000 - now) / 1000)
: windowSize;
return {
allowed: false,
retryAfter: waitTime,
remaining: 0,
resetTime: now + windowSize * 1000
};
}
// 4. 添加当前请求到有序集合
await redis.zadd(key, now, `${now}-${Math.random()}`);
// 5. 设置过期时间
await redis.expire(key, windowSize);
return {
allowed: true,
remaining: maxRequests - currentCount - 1,
resetTime: now + windowSize * 1000
};
}
// Koa 中间件
async function rateLimitMiddleware(ctx, next) {
// 根据用户 ID 或 IP 进行限流
const userId = ctx.state.user?.id || ctx.ip;
const key = `ratelimit:${userId}:${ctx.path}`;
const result = await slidingWindowRateLimit(key, WINDOW_SIZE, MAX_REQUESTS);
// 设置响应头
ctx.set('X-RateLimit-Limit', MAX_REQUESTS);
ctx.set('X-RateLimit-Remaining', result.remaining);
ctx.set('X-RateLimit-Reset', Math.ceil(result.resetTime / 1000));
if (!result.allowed) {
ctx.set('Retry-After', result.retryAfter);
ctx.status = 429;
ctx.body = {
error: 'Too Many Requests',
message: '请求过于频繁,请稍后再试',
retryAfter: result.retryAfter
};
return;
}
await next();
}
module.exports = { rateLimitMiddleware, slidingWindowRateLimit };
令牌桶算法实现
令牌桶算法允许一定程度的突发流量:
// 令牌桶限流器
class TokenBucket {
constructor(rate, capacity) {
this.rate = rate; // 每秒生成的令牌数
this.capacity = capacity; // 桶的容量
this.tokens = capacity; // 当前令牌数
this.lastRefill = Date.now();
}
// 补充令牌
refill() {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000; // 经过的秒数
// 计算应该补充的令牌数
const newTokens = elapsed * this.rate;
this.tokens = Math.min(this.capacity, this.tokens + newTokens);
this.lastRefill = now;
}
// 尝试获取令牌
tryConsume(tokens = 1) {
this.refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return {
allowed: true,
remainingTokens: this.tokens
};
}
return {
allowed: false,
remainingTokens: this.tokens,
waitTime: Math.ceil((tokens - this.tokens) / this.rate)
};
}
}
// 基于 Redis 的分布式令牌桶
class DistributedTokenBucket {
constructor(redis, key, rate, capacity) {
this.redis = redis;
this.key = key;
this.rate = rate;
this.capacity = capacity;
}
async consume(tokens = 1) {
const now = Date.now();
const luaScript = `
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local tokens = tonumber(ARGV[4])
-- 获取上次的令牌数和更新时间
local lastTokens = tonumber(redis.call('hget', key, 'tokens') or capacity)
local lastTime = tonumber(redis.call('hget', key, 'lastTime') or now)
-- 计算应该补充的令牌数
local elapsed = (now - lastTime) / 1000
local newTokens = math.min(capacity, lastTokens + elapsed * rate)
-- 尝试消耗令牌
if newTokens >= tokens then
newTokens = newTokens - tokens
redis.call('hset', key, 'tokens', newTokens)
redis.call('hset', key, 'lastTime', now)
redis.call('expire', key, math.ceil(capacity / rate) + 1)
return {1, newTokens}
else
local waitTime = math.ceil((tokens - newTokens) / rate)
return {0, newTokens, waitTime}
end
`;
const result = await this.redis.eval(
luaScript,
1,
this.key,
this.rate,
this.capacity,
now,
tokens
);
if (result[0] === 1) {
return { allowed: true, remainingTokens: result[1] };
}
return {
allowed: false,
remainingTokens: result[1],
waitTime: result[2]
};
}
}
module.exports = { TokenBucket, DistributedTokenBucket };
多维度限流策略
根据不同维度设置不同的限流规则:
// 多维度限流配置
const rateLimitRules = {
// 登录接口:严格限制
'/api/auth/login': {
window: 60,
max: 5,
message: '登录尝试过于频繁,请 1 分钟后再试'
},
'/api/auth/register': {
window: 3600,
max: 3,
message: '注册过于频繁,请 1 小时后再试'
},
// 普通 API 接口
'/api/': {
window: 60,
max: 100,
message: '请求过于频繁'
},
// 文件上传接口:限制请求体大小和频率
'/api/upload': {
window: 60,
max: 10,
message: '上传过于频繁'
},
// 搜索接口
'/api/search': {
window: 60,
max: 30,
message: '搜索请求过于频繁'
}
};
// 智能限流中间件
async function smartRateLimit(ctx, next) {
const path = ctx.path;
const rule = Object.entries(rateLimitRules)
.find(([pattern]) => path.startsWith(pattern));
if (!rule) {
return await next();
}
const [pattern, config] = rule;
const userId = ctx.state.user?.id || ctx.ip;
const key = `ratelimit:${userId}:${pattern}`;
const result = await slidingWindowRateLimit(key, config.window, config.max);
ctx.set('X-RateLimit-Limit', config.max);
ctx.set('X-RateLimit-Remaining', result.remaining);
if (!result.allowed) {
ctx.status = 429;
ctx.body = { error: 'Too Many Requests', message: config.message };
return;
}
await next();
}
总结
限流是保护 API 服务稳定性的重要手段。选择合适的限流算法需要根据业务场景来决定:
- 固定窗口适用于简单场景
- 滑动窗口适用于 API 限流
- 令牌桶适用于需要允许突发流量的场景
- 多维度限流可以更精细地控制不同接口的访问
结合 Redis 可以实现分布式限流,支持集群部署。