Koa 集成 WebSocket 实现实时通信
WebSocket 与 Koa 的结合场景
WebSocket 提供了全双工通信能力,适合实时性要求高的场景:在线聊天、实时通知、数据推送、协同编辑等。Koa 本身不内置 WebSocket 支持,但可以通过 ws 库轻松集成。
基础集成方案
使用 ws 库与 Koa 的 HTTP 服务器共享端口:
const Koa = require('koa');
const WebSocket = require('ws');
const http = require('http');
const app = new Koa();
// Koa 常规路由
app.use(async ctx => {
ctx.body = 'Hello Koa';
});
// 创建 HTTP 服务器
const server = http.createServer(app.callback());
// 挂载 WebSocket 服务
const wss = new WebSocket.Server({ server });
wss.on('connection', (ws, req) => {
console.log('客户端连接:', req.socket.remoteAddress);
ws.on('message', (data) => {
console.log('收到消息:', data.toString());
ws.send(`服务端回复: ${data}`);
});
ws.on('close', () => {
console.log('客户端断开');
});
});
server.listen(3000, () => {
console.log('Server running on http://localhost:3000');
});
连接管理与房间分组
实际项目中需要对连接进行管理,实现定向推送:
class ConnectionManager {
constructor() {
this.clients = new Map(); // clientId -> ws
this.rooms = new Map(); // roomId -> Set(clientId)
}
add(clientId, ws) {
this.clients.set(clientId, ws);
ws.clientId = clientId;
}
joinRoom(clientId, roomId) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId).add(clientId);
}
broadcast(roomId, message) {
const room = this.rooms.get(roomId);
if (!room) return;
for (const clientId of room) {
const ws = this.clients.get(clientId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
}
remove(clientId) {
this.clients.delete(clientId);
for (const room of this.rooms.values()) {
room.delete(clientId);
}
}
}
const manager = new ConnectionManager();
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const msg = JSON.parse(data);
switch (msg.type) {
case 'auth':
manager.add(msg.clientId, ws);
break;
case 'join':
manager.joinRoom(ws.clientId, msg.roomId);
break;
case 'chat':
manager.broadcast(msg.roomId, {
from: ws.clientId,
content: msg.content,
time: Date.now()
});
break;
}
});
ws.on('close', () => {
if (ws.clientId) manager.remove(ws.clientId);
});
});
心跳保活机制
防止 NAT 超时和异常连接占用资源:
function heartbeat(ws) {
ws.isAlive = true;
}
wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', () => heartbeat(ws));
ws.on('message', (data) => {
// 业务处理
});
});
// 定时检测
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('close', () => {
clearInterval(interval);
});
与 Koa 中间件共享上下文
如果需要复用 Koa 的认证中间件,可以通过升级请求实现:
const jwt = require('jsonwebtoken');
wss.on('connection', async (ws, req) => {
// 从 URL 参数或 Cookie 获取 token
const url = new URL(req.url, 'http://localhost');
const token = url.searchParams.get('token');
try {
const user = jwt.verify(token, process.env.JWT_SECRET);
ws.user = user;
ws.send(JSON.stringify({ type: 'connected', userId: user.id }));
} catch (err) {
ws.close(1008, '认证失败');
}
});
总结
Koa 与 WebSocket 的结合并不复杂,关键在于连接管理、消息协议设计和异常处理。生产环境建议配合 Redis 实现多节点广播,使用 PM2 管理进程时需注意 WebSocket 的粘性会话配置。