Koa.js WebSocket 实时通信与 IM 系统开发
WebSocket 概述
WebSocket 是一种在单个 TCP 连接上提供全双工通信的协议,相比传统的 HTTP 轮询,它具有更低延迟、更好性能的优点。在 Koa.js 应用中集成 WebSocket,可以实现实时聊天、在线状态、消息推送等功能。本文介绍 Koa.js 中 WebSocket 的集成方案和 IM 系统开发实践。
系统架构设计
实时通信系统采用分层架构:
| 层次 | 功能描述 | 技术选型 |
|---|---|---|
| 接入层 | WebSocket 连接管理、心跳检测 | ws 库 |
| 路由层 | 消息路由、群组管理 | Koa Router |
| 业务层 | 消息处理、好友关系、会话管理 | Node.js |
| 存储层 | 消息持久化、用户状态 | Redis + MySQL |
核心功能实现
1. WebSocket 服务集成
在 Koa 中集成 WebSocket 服务:
// WebSocket 服务
const WebSocket = require('ws');
const Koa = require('koa');
const http = require('http');
const app = new Koa();
const server = http.createServer(app.callback());
// 创建 WebSocket 服务器
const wss = new WebSocket.Server({ server });
// 连接管理
const clients = new Map(); // ws -> { userId, userInfo }
const userSockets = new Map(); // userId -> Set(ws)
// 心跳检测
const HEARTBEAT_INTERVAL = 30000; // 30秒
const CLIENT_TIMEOUT = 60000; // 60秒
wss.on('connection', (ws, req) => {
const clientIp = req.socket.remoteAddress;
console.log(`新连接: ${clientIp}`);
// 初始化客户端信息
ws.isAlive = true;
ws.userId = null;
ws.messageQueue = [];
// 心跳检测
ws.on('pong', () => {
ws.isAlive = true;
});
// 消息处理
ws.on('message', async (message) => {
try {
const data = JSON.parse(message);
await handleMessage(ws, data);
} catch (error) {
console.error('消息处理错误:', error);
ws.send(JSON.stringify({
type: 'error',
message: error.message
}));
}
});
// 关闭连接
ws.on('close', () => {
handleDisconnect(ws);
});
// 错误处理
ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
handleDisconnect(ws);
});
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'system',
message: '连接成功,请先登录'
}));
});
// 心跳检测定时器
const heartbeatInterval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
handleDisconnect(ws);
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, HEARTBEAT_INTERVAL);
// 消息处理函数
async function handleMessage(ws, data) {
switch (data.type) {
case 'login':
await handleLogin(ws, data);
break;
case 'chat':
await handleChat(ws, data);
break;
case 'group_chat':
await handleGroupChat(ws, data);
break;
case 'heartbeat':
ws.send(JSON.stringify({ type: 'heartbeat_ack' }));
break;
default:
ws.send(JSON.stringify({
type: 'error',
message: `未知消息类型: ${data.type}`
}));
}
}
// 登录处理
async function handleLogin(ws, data) {
const { userId, token } = data;
// 验证 token(实际应调用认证服务)
const userInfo = await verifyToken(token);
if (!userInfo) {
ws.send(JSON.stringify({
type: 'login_failed',
message: '认证失败'
}));
return;
}
// 保存用户连接
ws.userId = userId;
ws.userInfo = userInfo;
if (!userSockets.has(userId)) {
userSockets.set(userId, new Set());
}
userSockets.get(userId).add(ws);
clients.set(ws, { userId, userInfo });
// 更新用户在线状态
await updateUserOnlineStatus(userId, true);
// 发送登录成功消息
ws.send(JSON.stringify({
type: 'login_success',
userId: userId,
userInfo: userInfo
}));
// 通知好友上线
await notifyFriendsOnline(userId);
}
2. 消息管理与存储
消息的存储和离线消息处理:
// 消息服务
class MessageService {
constructor(db, redis) {
this.db = db;
this.redis = redis;
}
// 发送消息
async sendMessage(msg) {
const message = {
id: this.generateMessageId(),
from: msg.from,
to: msg.to,
content: msg.content,
type: msg.type || 'text',
timestamp: Date.now(),
status: 'pending'
};
// 1. 检查接收者是否在线
const recipientSockets = userSockets.get(msg.to);
if (recipientSockets && recipientSockets.size > 0) {
// 在线,直接发送
message.status = 'delivered';
this.broadcastToUser(msg.to, {
type: 'new_message',
message: message
});
} else {
// 不在线,存入离线消息队列
await this.queueOfflineMessage(msg.to, message);
message.status = 'offline';
}
// 2. 持久化消息
await this.persistMessage(message);
// 3. 消息已读回执
if (msg.requireReceipt) {
// 等待接收者确认
await this.waitForReceipt(message.id, 30000);
}
return message;
}
// 群发消息
async sendGroupMessage(groupId, msg) {
// 获取群成员
const members = await this.getGroupMembers(groupId);
const results = [];
for (const memberId of members) {
if (memberId === msg.from) continue; // 跳过发送者
const result = await this.sendMessage({
...msg,
to: memberId,
groupId
});
results.push(result);
}
return results;
}
// 离线消息队列
async queueOfflineMessage(userId, message) {
const queueKey = `offline:${userId}`;
await this.redis.rpush(queueKey, JSON.stringify(message));
// 设置过期时间 7 天
await this.redis.expire(queueKey, 7 * 24 * 3600);
}
// 获取离线消息
async getOfflineMessages(userId) {
const queueKey = `offline:${userId}`;
const messages = await this.redis.lrange(queueKey, 0, -1);
if (messages.length > 0) {
// 清除离线消息
await this.redis.del(queueKey);
}
return messages.map(m => JSON.parse(m));
}
// 消息持久化
async persistMessage(message) {
await this.db.collection('messages').insertOne({
...message,
createdAt: new Date()
});
}
// 消息历史查询
async getMessageHistory(userId, contactId, options = {}) {
const { limit = 50, beforeTime } = options;
const query = {
$or: [
{ from: userId, to: contactId },
{ from: contactId, to: userId }
]
};
if (beforeTime) {
query.timestamp = { $lt: beforeTime };
}
return await this.db.collection('messages')
.find(query)
.sort({ timestamp: -1 })
.limit(limit)
.toArray();
}
// 消息已读
async markAsRead(messageId, userId) {
await this.db.collection('messages').updateOne(
{ id: messageId, to: userId },
{ $set: { read: true, readAt: new Date() } }
);
}
}
3. 在线状态管理
用户在线状态和好友上下线通知:
// 在线状态服务
class PresenceService {
constructor(redis) {
this.redis = redis;
}
// 用户上线
async userOnline(userId, deviceInfo = {}) {
const presenceKey = `presence:${userId}`;
const now = Date.now();
// 记录在线状态
await this.redis.hset(presenceKey, {
status: 'online',
lastSeen: now,
device: JSON.stringify(deviceInfo)
});
// 添加到在线列表
await this.redis.zadd('online_users', now, userId);
// 设置离线定时器(断连后更新状态)
await this.redis.expire(presenceKey, 65); // 略长于客户端超时
}
// 用户离线
async userOffline(userId) {
const presenceKey = `presence:${userId}`;
const now = Date.now();
// 更新离线状态
await this.redis.hset(presenceKey, {
status: 'offline',
lastSeen: now
});
// 从在线列表移除
await this.redis.zrem('online_users', userId);
}
// 获取用户状态
async getUserStatus(userId) {
const presenceKey = `presence:${userId}`;
const data = await this.redis.hgetall(presenceKey);
if (!data || !data.status) {
return { status: 'offline' };
}
return {
status: data.status,
lastSeen: parseInt(data.lastSeen),
device: data.device ? JSON.parse(data.device) : null
};
}
// 获取在线好友列表
async getOnlineFriends(userId) {
const friendIds = await this.getFriendIds(userId);
const onlineUsers = await this.redis.zrange('online_users', 0, -1);
const onlineSet = new Set(onlineUsers.map(Number));
return friendIds.filter(id => onlineSet.has(id));
}
// 批量获取用户状态
async getBatchUserStatus(userIds) {
const results = {};
for (const userId of userIds) {
results[userId] = await this.getUserStatus(userId);
}
return results;
}
// 通知好友上线/离线
async notifyFriends(userId, event) {
const friendIds = await this.getFriendIds(userId);
for (const friendId of friendIds) {
const friendSockets = userSockets.get(friendId);
if (friendSockets) {
const message = {
type: 'friend_presence',
userId: userId,
event: event // 'online' or 'offline'
};
this.broadcastToUser(friendId, message);
}
}
}
}
4. 消息可靠性保障
确保消息可靠送达:
// 消息可靠性保障
class MessageReliability {
constructor(messageService) {
this.pendingMessages = new Map(); // messageId -> { resolve, reject, timer }
}
// 发送消息并等待确认
async sendWithAck(ws, message, ackTimeout = 10000) {
return new Promise((resolve, reject) => {
const messageId = message.id || this.generateId();
// 设置确认超时
const timer = setTimeout(() => {
this.pendingMessages.delete(messageId);
reject(new Error('消息确认超时'));
}, ackTimeout);
this.pendingMessages.set(messageId, { resolve, reject, timer });
// 发送消息
ws.send(JSON.stringify({
...message,
id: messageId,
requireAck: true
}));
});
}
// 处理消息确认
handleAck(messageId, success) {
const pending = this.pendingMessages.get(messageId);
if (!pending) return;
clearTimeout(pending.timer);
this.pendingMessages.delete(messageId);
if (success) {
pending.resolve({ success: true, messageId });
} else {
pending.reject(new Error('消息发送失败'));
}
}
// 消息重发机制
async retryFailedMessages(ws) {
const failedMessages = await this.getFailedMessages();
for (const msg of failedMessages) {
const retryCount = msg.retryCount || 0;
if (retryCount >= 3) {
// 超过重试次数,标记失败
await this.markMessageFailed(msg.id);
continue;
}
// 重试发送
ws.send(JSON.stringify({
...msg,
retryCount: retryCount + 1,
isRetry: true
}));
// 等待确认
await this.sleep(2000);
}
}
}
性能优化策略
高并发 WebSocket 性能优化:
| 策略 | 实现方式 | 效果 |
|---|---|---|
| 连接复用 | 使用 HTTP/2 多路复用 | 减少 TCP 连接数 |
| 消息压缩 | 启用 WebSocket 压缩扩展 | 带宽降低 70% |
| 分布式部署 | 使用 Redis Pub/Sub | 支持水平扩展 |
| 消息队列 | 引入 Kafka/RabbitMQ | 峰值流量削峰 |
总结
WebSocket 为 Koa.js 应用提供了强大的实时通信能力。在实际开发中,需要注意连接管理、心跳检测、消息可靠性、水平扩展等问题。通过合理的架构设计和技术选型,可以构建稳定、高效的 IM 系统。