KOA技术分享

专注 Koa.js 框架的编程知识分享

Koa 集成 WebSocket 实现实时通信

WebSocket 与 Koa 的结合场景

WebSocket 提供了全双工通信能力,适合实时性要求高的场景:在线聊天、实时通知、数据推送、协同编辑等。Koa 本身不内置 WebSocket 支持,但可以通过 ws 库轻松集成。

基础集成方案

使用 ws 库与 Koa 的 HTTP 服务器共享端口:

const Koa = require('koa');
const WebSocket = require('ws');
const http = require('http');

const app = new Koa();

// Koa 常规路由
app.use(async ctx => {
  ctx.body = 'Hello Koa';
});

// 创建 HTTP 服务器
const server = http.createServer(app.callback());

// 挂载 WebSocket 服务
const wss = new WebSocket.Server({ server });

wss.on('connection', (ws, req) => {
  console.log('客户端连接:', req.socket.remoteAddress);

  ws.on('message', (data) => {
    console.log('收到消息:', data.toString());
    ws.send(`服务端回复: ${data}`);
  });

  ws.on('close', () => {
    console.log('客户端断开');
  });
});

server.listen(3000, () => {
  console.log('Server running on http://localhost:3000');
});

连接管理与房间分组

实际项目中需要对连接进行管理,实现定向推送:

class ConnectionManager {
  constructor() {
    this.clients = new Map();      // clientId -> ws
    this.rooms = new Map();        // roomId -> Set(clientId)
  }

  add(clientId, ws) {
    this.clients.set(clientId, ws);
    ws.clientId = clientId;
  }

  joinRoom(clientId, roomId) {
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }
    this.rooms.get(roomId).add(clientId);
  }

  broadcast(roomId, message) {
    const room = this.rooms.get(roomId);
    if (!room) return;
    for (const clientId of room) {
      const ws = this.clients.get(clientId);
      if (ws && ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify(message));
      }
    }
  }

  remove(clientId) {
    this.clients.delete(clientId);
    for (const room of this.rooms.values()) {
      room.delete(clientId);
    }
  }
}

const manager = new ConnectionManager();

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    const msg = JSON.parse(data);
    switch (msg.type) {
      case 'auth':
        manager.add(msg.clientId, ws);
        break;
      case 'join':
        manager.joinRoom(ws.clientId, msg.roomId);
        break;
      case 'chat':
        manager.broadcast(msg.roomId, {
          from: ws.clientId,
          content: msg.content,
          time: Date.now()
        });
        break;
    }
  });

  ws.on('close', () => {
    if (ws.clientId) manager.remove(ws.clientId);
  });
});

心跳保活机制

防止 NAT 超时和异常连接占用资源:

function heartbeat(ws) {
  ws.isAlive = true;
}

wss.on('connection', (ws) => {
  ws.isAlive = true;
  ws.on('pong', () => heartbeat(ws));

  ws.on('message', (data) => {
    // 业务处理
  });
});

// 定时检测
const interval = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (!ws.isAlive) {
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

wss.on('close', () => {
  clearInterval(interval);
});

与 Koa 中间件共享上下文

如果需要复用 Koa 的认证中间件,可以通过升级请求实现:

const jwt = require('jsonwebtoken');

wss.on('connection', async (ws, req) => {
  // 从 URL 参数或 Cookie 获取 token
  const url = new URL(req.url, 'http://localhost');
  const token = url.searchParams.get('token');

  try {
    const user = jwt.verify(token, process.env.JWT_SECRET);
    ws.user = user;
    ws.send(JSON.stringify({ type: 'connected', userId: user.id }));
  } catch (err) {
    ws.close(1008, '认证失败');
  }
});

总结

Koa 与 WebSocket 的结合并不复杂,关键在于连接管理、消息协议设计和异常处理。生产环境建议配合 Redis 实现多节点广播,使用 PM2 管理进程时需注意 WebSocket 的粘性会话配置。

下一篇:Koa 项目单元测试与接口测试实战 →