KOA技术分享

专注 Koa.js 框架的编程知识分享

Koa.js API 网关设计与请求路由转发

API 网关概述

API 网关是微服务架构中的核心组件,负责请求路由、负载均衡、认证授权、限流熔断等功能。本文介绍如何使用 Koa.js 构建高性能的 API 网关,涵盖服务发现、请求转发、熔断降级等核心功能。

网关架构设计

API 网关的整体架构:

层次 功能 技术实现
接入层 SSL 终结、连接管理 Nginx / Koa
路由层 请求路由、路径改写 Koa Router
认证层 JWT 验证、权限检查 koa-jwt
限流层 请求限流、熔断降级 Redis / Circuit Breaker
转发层 负载均衡、协议转换 http-proxy-middleware

基础路由转发实现

使用 Koa 构建基础的反向代理网关:

const Koa = require('koa');
const Router = require('koa-router');
const proxy = require('koa-http-proxy');
const axios = require('axios');

const app = new Koa();
const router = new Router();

// 服务注册表
const serviceRegistry = {
  'user-service': ['http://localhost:3001', 'http://localhost:3002'],
  'order-service': ['http://localhost:4001', 'http://localhost:4002'],
  'product-service': ['http://localhost:5001', 'http://localhost:5002']
};

// 负载均衡:轮询
const roundRobin = {};
function getTarget(serviceName) {
  const servers = serviceRegistry[serviceName];
  if (!servers || servers.length === 0) {
    return null;
  }

  if (!roundRobin[serviceName]) {
    roundRobin[serviceName] = 0;
  }

  const target = servers[roundRobin[serviceName] % servers.length];
  roundRobin[serviceName]++;

  return target;
}

// 请求路由中间件
async function routeMiddleware(ctx, next) {
  const path = ctx.path;

  // 解析服务名和路径 /api/user -> user-service
  const match = path.match(/^\/api\/(\w+)\/(.*)$/);
  if (!match) {
    ctx.status = 404;
    ctx.body = { error: '路由未找到' };
    return;
  }

  const [, serviceName, servicePath] = match;
  const target = getTarget(serviceName);

  if (!target) {
    ctx.status = 502;
    ctx.body = { error: '服务不可用' };
    return;
  }

  const targetUrl = `${target}/${servicePath}`;

  try {
    // 转发请求
    const response = await axios({
      method: ctx.method,
      url: targetUrl,
      headers: {
        ...ctx.headers,
        host: new URL(target).host // 修改 Host 头
      },
      data: ctx.request.body,
      params: ctx.query,
      timeout: 5000
    });

    ctx.status = response.status;
    ctx.body = response.data;
  } catch (error) {
    ctx.status = error.response?.status || 502;
    ctx.body = {
      error: '后端服务错误',
      message: error.message
    };
  }
}

// 使用 koa-http-proxy 的方式
const apiProxy = proxy('/api/', {
  target: 'http://localhost:3000',
  changeOrigin: true,
  pathRewrite: {
    '^/api/': '/'
  }
});

router.get('/api/user/:id', async (ctx) => {
  const target = getTarget('user-service');
  const response = await axios.get(`${target}/users/${ctx.params.id}`);
  ctx.body = response.data;
});

app.use(router.routes());
app.use(router.allowedMethods());

app.listen(8080, () => {
  console.log('API Gateway running on port 8080');
});

服务健康检查与熔断机制

实现服务熔断,防止故障扩散:

// 熔断器实现
class CircuitBreaker {
  constructor(options = {}) {
    this.failureThreshold = options.failureThreshold || 5; // 失败次数阈值
    this.successThreshold = options.successThreshold || 2; // 恢复需要的成功次数
    this.timeout = options.timeout || 60000; // 熔断持续时间(毫秒)

    this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    this.failureCount = 0;
    this.successCount = 0;
    this.lastFailureTime = null;
  }

  // 执行请求
  async execute(promise) {
    if (this.state === 'OPEN') {
      // 检查是否应该进入半开状态
      if (Date.now() - this.lastFailureTime >= this.timeout) {
        this.state = 'HALF_OPEN';
        console.log('Circuit breaker: OPEN -> HALF_OPEN');
      } else {
        throw new Error('Circuit breaker is OPEN');
      }
    }

    try {
      const result = await promise;
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  onSuccess() {
    this.failureCount = 0;

    if (this.state === 'HALF_OPEN') {
      this.successCount++;
      if (this.successCount >= this.successThreshold) {
        this.state = 'CLOSED';
        this.successCount = 0;
        console.log('Circuit breaker: HALF_OPEN -> CLOSED');
      }
    }
  }

  onFailure() {
    this.failureCount++;
    this.lastFailureTime = Date.now();

    if (this.state === 'HALF_OPEN') {
      this.state = 'OPEN';
      console.log('Circuit breaker: HALF_OPEN -> OPEN');
    } else if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      console.log('Circuit breaker: CLOSED -> OPEN');
    }
  }

  getState() {
    return this.state;
  }
}

// 服务健康检查
class HealthChecker {
  constructor() {
    this.services = new Map();
    this.checkInterval = 10000; // 10 秒检查一次
  }

  // 注册服务
  registerService(name, endpoints) {
    this.services.set(name, {
      endpoints: endpoints.map(url => ({
        url,
        healthy: true,
        lastCheck: null,
        responseTime: 0,
        failures: 0
      })),
      breaker: new Circuitbreaker()
    });

    // 启动健康检查
    this.startHealthCheck(name);
  }

  // 健康检查
  async checkService(name) {
    const service = this.services.get(name);
    if (!service) return null;

    for (const endpoint of service.endpoints) {
      const start = Date.now();
      try {
        const response = await axios.get(`${endpoint.url}/health`, {
          timeout: 3000
        });

        endpoint.healthy = response.status === 200;
        endpoint.responseTime = Date.now() - start;
        endpoint.failures = 0;
        endpoint.lastCheck = new Date();
      } catch (error) {
        endpoint.healthy = false;
        endpoint.failures++;
        endpoint.lastCheck = new Date();
      }
    }

    return service.endpoints.some(e => e.healthy) ? service.endpoints : null;
  }

  // 选择健康的服务
  selectEndpoint(name) {
    const service = this.services.get(name);
    if (!service) return null;

    const healthy = service.endpoints
      .filter(e => e.healthy)
      .sort((a, b) => a.responseTime - b.responseTime);

    return healthy.length > 0 ? healthy[0] : null;
  }

  startHealthCheck(name) {
    setInterval(() => this.checkService(name), this.checkInterval);
  }
}

const healthChecker = new HealthChecker();
healthChecker.registerService('user-service', ['http://localhost:3001', 'http://localhost:3002']);
healthChecker.registerService('order-service', ['http://localhost:4001', 'http://localhost:4002']);

请求限流与熔断

结合限流和熔断保护后端服务:

// 网关限流配置
const rateLimitConfig = {
  global: {
    window: 60,
    max: 1000
  },
  byService: {
    'user-service': { window: 60, max: 200 },
    'order-service': { window: 60, max: 500 },
    'product-service': { window: 60, max: 300 }
  },
  byEndpoint: {
    '/api/auth/login': { window: 60, max: 5 },
    '/api/payment/': { window: 60, max: 10 }
  }
};

// 限流中间件
async function gatewayRateLimit(ctx, next) {
  const userId = ctx.state.user?.id || ctx.ip;
  const service = ctx.params.service;

  // 获取对应服务的限流配置
  const serviceLimit = rateLimitConfig.byService[service];
  const config = serviceLimit || rateLimitConfig.global;

  const key = `gateway:ratelimit:${userId}:${service}`;
  const result = await slidingWindowRateLimit(key, config.window, config.max);

  ctx.set('X-RateLimit-Limit', config.max);
  ctx.set('X-RateLimit-Remaining', result.remaining);

  if (!result.allowed) {
    ctx.status = 429;
    return;
  }

  await next();
}

// 完整的网关中间件链
app.use(errorHandling);      // 错误处理
app.use(requestId);          // 请求 ID 生成
app.use(logger);             // 日志记录
app.use(rateLimitConfig);    // 全局限流
app.use(authMiddleware);     // 认证
app.use(serviceRouter);      // 服务路由
app.use(gatewayRateLimit);   // 服务限流
app.use(proxyHandler);       // 请求转发

动态路由与服务发现

支持服务动态注册和发现:

// 服务注册中心
const serviceRegistry = new Map();

// 服务注册
function registerService(serviceName, endpoint) {
  if (!serviceRegistry.has(serviceName)) {
    serviceRegistry.set(serviceName, {
      endpoints: new Set(),
      metadata: {}
    });
  }

  serviceRegistry.get(serviceName).endpoints.add(endpoint);
  console.log(`Service registered: ${serviceName} -> ${endpoint}`);
}

// 服务注销
function deregisterService(serviceName, endpoint) {
  const service = serviceRegistry.get(serviceName);
  if (service) {
    service.endpoints.delete(endpoint);
    console.log(`Service deregistered: ${serviceName} -> ${endpoint}`);
  }
}

// 获取健康的服务实例
function getHealthyEndpoints(serviceName) {
  const service = serviceRegistry.get(serviceName);
  if (!service) return [];

  return Array.from(service.endpoints).filter(endpoint => {
    const health = serviceHealth.get(`${serviceName}:${endpoint}`);
    return health?.healthy !== false;
  });
}

// 基于 Consul 的服务发现
const consul = require('consul')({ host: 'localhost', port: 8500 });

async function discoverFromConsul(serviceName) {
  const services = await consul.catalog.service.nodes(serviceName);
  return services.map(node => `http://${node.Address}:${node.ServicePort}`);
}

// 定时同步服务列表
setInterval(async () => {
  for (const [name, config] of targetServices) {
    try {
      const endpoints = await discoverFromConsul(name);
      serviceRegistry.set(name, {
        endpoints,
        lastUpdate: Date.now()
      });
    } catch (error) {
      console.error(`Service discovery failed for ${name}:`, error);
    }
  }
}, 30000); // 每 30 秒同步一次

总结

Koa.js API 网关是微服务架构的核心组件,主要功能包括:

通过合理的架构设计,可以构建高可用、高性能的 API 网关。

← 下一篇:Koa.js限流与流量控制策略实现