KOA技术分享

专注 Koa.js 框架的编程知识分享

Koa.js 限流与流量控制策略实现

为什么需要限流

在生产环境中,API 接口面临着各种流量风险:恶意刷接口、DDoS 攻击、突发流量冲击等。限流(Rate Limiting)是保护服务稳定性的重要手段。本文介绍 Koa.js 中限流策略的实现方案,包括计数器限流、令牌桶算法、滑动窗口等常见方案。

限流算法对比

算法 优点 缺点 适用场景
固定窗口 实现简单 边界流量突发 简单场景
滑动窗口 平滑限流 实现稍复杂 API 限流
令牌桶 允许突发流量 需要额外存储 流量削峰
漏桶 流量平滑 不支持突发 消息队列

基于 Redis 的滑动窗口限流

使用 Redis 实现分布式限流,支持集群部署:

// 滑动窗口限流中间件
const Redis = require('ioredis');
const redis = new Redis();

const WINDOW_SIZE = 60; // 窗口大小(秒)
const MAX_REQUESTS = 100; // 窗口内最大请求数

// 滑动窗口限流函数
async function slidingWindowRateLimit(key, windowSize, maxRequests) {
  const now = Date.now();
  const windowStart = now - windowSize * 1000;

  // 使用 Redis 事务
  const multi = redis.multi();

  // 1. 移除窗口外的请求记录
  multi.zremrangebyscore(key, 0, windowStart);

  // 2. 获取当前窗口内的请求数
  multi.zcard(key);

  // 3. 如果未超限,添加新请求
  const results = await multi.exec();

  // results[0] 是 zremrangebyscore 的结果
  // results[1] 是 zcard 的结果(当前请求数)
  const currentCount = results[1][1];

  if (currentCount >= maxRequests) {
    // 获取最早请求的时间,计算等待时间
    const earliest = await redis.zrange(key, 0, 0, 'WITHSCORES');
    const waitTime = earliest.length > 1
      ? Math.ceil((parseInt(earliest[1]) + windowSize * 1000 - now) / 1000)
      : windowSize;

    return {
      allowed: false,
      retryAfter: waitTime,
      remaining: 0,
      resetTime: now + windowSize * 1000
    };
  }

  // 4. 添加当前请求到有序集合
  await redis.zadd(key, now, `${now}-${Math.random()}`);

  // 5. 设置过期时间
  await redis.expire(key, windowSize);

  return {
    allowed: true,
    remaining: maxRequests - currentCount - 1,
    resetTime: now + windowSize * 1000
  };
}

// Koa 中间件
async function rateLimitMiddleware(ctx, next) {
  // 根据用户 ID 或 IP 进行限流
  const userId = ctx.state.user?.id || ctx.ip;
  const key = `ratelimit:${userId}:${ctx.path}`;

  const result = await slidingWindowRateLimit(key, WINDOW_SIZE, MAX_REQUESTS);

  // 设置响应头
  ctx.set('X-RateLimit-Limit', MAX_REQUESTS);
  ctx.set('X-RateLimit-Remaining', result.remaining);
  ctx.set('X-RateLimit-Reset', Math.ceil(result.resetTime / 1000));

  if (!result.allowed) {
    ctx.set('Retry-After', result.retryAfter);
    ctx.status = 429;
    ctx.body = {
      error: 'Too Many Requests',
      message: '请求过于频繁,请稍后再试',
      retryAfter: result.retryAfter
    };
    return;
  }

  await next();
}

module.exports = { rateLimitMiddleware, slidingWindowRateLimit };

令牌桶算法实现

令牌桶算法允许一定程度的突发流量:

// 令牌桶限流器
class TokenBucket {
  constructor(rate, capacity) {
    this.rate = rate; // 每秒生成的令牌数
    this.capacity = capacity; // 桶的容量
    this.tokens = capacity; // 当前令牌数
    this.lastRefill = Date.now();
  }

  // 补充令牌
  refill() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000; // 经过的秒数

    // 计算应该补充的令牌数
    const newTokens = elapsed * this.rate;
    this.tokens = Math.min(this.capacity, this.tokens + newTokens);
    this.lastRefill = now;
  }

  // 尝试获取令牌
  tryConsume(tokens = 1) {
    this.refill();

    if (this.tokens >= tokens) {
      this.tokens -= tokens;
      return {
        allowed: true,
        remainingTokens: this.tokens
      };
    }

    return {
      allowed: false,
      remainingTokens: this.tokens,
      waitTime: Math.ceil((tokens - this.tokens) / this.rate)
    };
  }
}

// 基于 Redis 的分布式令牌桶
class DistributedTokenBucket {
  constructor(redis, key, rate, capacity) {
    this.redis = redis;
    this.key = key;
    this.rate = rate;
    this.capacity = capacity;
  }

  async consume(tokens = 1) {
    const now = Date.now();
    const luaScript = `
      local key = KEYS[1]
      local rate = tonumber(ARGV[1])
      local capacity = tonumber(ARGV[2])
      local now = tonumber(ARGV[3])
      local tokens = tonumber(ARGV[4])

      -- 获取上次的令牌数和更新时间
      local lastTokens = tonumber(redis.call('hget', key, 'tokens') or capacity)
      local lastTime = tonumber(redis.call('hget', key, 'lastTime') or now)

      -- 计算应该补充的令牌数
      local elapsed = (now - lastTime) / 1000
      local newTokens = math.min(capacity, lastTokens + elapsed * rate)

      -- 尝试消耗令牌
      if newTokens >= tokens then
        newTokens = newTokens - tokens
        redis.call('hset', key, 'tokens', newTokens)
        redis.call('hset', key, 'lastTime', now)
        redis.call('expire', key, math.ceil(capacity / rate) + 1)
        return {1, newTokens}
      else
        local waitTime = math.ceil((tokens - newTokens) / rate)
        return {0, newTokens, waitTime}
      end
    `;

    const result = await this.redis.eval(
      luaScript,
      1,
      this.key,
      this.rate,
      this.capacity,
      now,
      tokens
    );

    if (result[0] === 1) {
      return { allowed: true, remainingTokens: result[1] };
    }

    return {
      allowed: false,
      remainingTokens: result[1],
      waitTime: result[2]
    };
  }
}

module.exports = { TokenBucket, DistributedTokenBucket };

多维度限流策略

根据不同维度设置不同的限流规则:

// 多维度限流配置
const rateLimitRules = {
  // 登录接口:严格限制
  '/api/auth/login': {
    window: 60,
    max: 5,
    message: '登录尝试过于频繁,请 1 分钟后再试'
  },
  '/api/auth/register': {
    window: 3600,
    max: 3,
    message: '注册过于频繁,请 1 小时后再试'
  },
  // 普通 API 接口
  '/api/': {
    window: 60,
    max: 100,
    message: '请求过于频繁'
  },
  // 文件上传接口:限制请求体大小和频率
  '/api/upload': {
    window: 60,
    max: 10,
    message: '上传过于频繁'
  },
  // 搜索接口
  '/api/search': {
    window: 60,
    max: 30,
    message: '搜索请求过于频繁'
  }
};

// 智能限流中间件
async function smartRateLimit(ctx, next) {
  const path = ctx.path;
  const rule = Object.entries(rateLimitRules)
    .find(([pattern]) => path.startsWith(pattern));

  if (!rule) {
    return await next();
  }

  const [pattern, config] = rule;
  const userId = ctx.state.user?.id || ctx.ip;
  const key = `ratelimit:${userId}:${pattern}`;

  const result = await slidingWindowRateLimit(key, config.window, config.max);

  ctx.set('X-RateLimit-Limit', config.max);
  ctx.set('X-RateLimit-Remaining', result.remaining);

  if (!result.allowed) {
    ctx.status = 429;
    ctx.body = { error: 'Too Many Requests', message: config.message };
    return;
  }

  await next();
}

总结

限流是保护 API 服务稳定性的重要手段。选择合适的限流算法需要根据业务场景来决定:

结合 Redis 可以实现分布式限流,支持集群部署。

← 下一篇:Koa.js WebSocket实时通信与IM系统开发