KOA技术分享

专注 Koa.js 框架的编程知识分享

Koa.js WebSocket 实时通信与 IM 系统开发

WebSocket 概述

WebSocket 是一种在单个 TCP 连接上提供全双工通信的协议,相比传统的 HTTP 轮询,它具有更低延迟、更好性能的优点。在 Koa.js 应用中集成 WebSocket,可以实现实时聊天、在线状态、消息推送等功能。本文介绍 Koa.js 中 WebSocket 的集成方案和 IM 系统开发实践。

系统架构设计

实时通信系统采用分层架构:

层次 功能描述 技术选型
接入层 WebSocket 连接管理、心跳检测 ws 库
路由层 消息路由、群组管理 Koa Router
业务层 消息处理、好友关系、会话管理 Node.js
存储层 消息持久化、用户状态 Redis + MySQL

核心功能实现

1. WebSocket 服务集成

在 Koa 中集成 WebSocket 服务:

// WebSocket 服务
const WebSocket = require('ws');
const Koa = require('koa');
const http = require('http');

const app = new Koa();
const server = http.createServer(app.callback());

// 创建 WebSocket 服务器
const wss = new WebSocket.Server({ server });

// 连接管理
const clients = new Map(); // ws -> { userId, userInfo }
const userSockets = new Map(); // userId -> Set(ws)

// 心跳检测
const HEARTBEAT_INTERVAL = 30000; // 30秒
const CLIENT_TIMEOUT = 60000; // 60秒

wss.on('connection', (ws, req) => {
  const clientIp = req.socket.remoteAddress;
  console.log(`新连接: ${clientIp}`);

  // 初始化客户端信息
  ws.isAlive = true;
  ws.userId = null;
  ws.messageQueue = [];

  // 心跳检测
  ws.on('pong', () => {
    ws.isAlive = true;
  });

  // 消息处理
  ws.on('message', async (message) => {
    try {
      const data = JSON.parse(message);
      await handleMessage(ws, data);
    } catch (error) {
      console.error('消息处理错误:', error);
      ws.send(JSON.stringify({
        type: 'error',
        message: error.message
      }));
    }
  });

  // 关闭连接
  ws.on('close', () => {
    handleDisconnect(ws);
  });

  // 错误处理
  ws.on('error', (error) => {
    console.error('WebSocket 错误:', error);
    handleDisconnect(ws);
  });

  // 发送欢迎消息
  ws.send(JSON.stringify({
    type: 'system',
    message: '连接成功,请先登录'
  }));
});

// 心跳检测定时器
const heartbeatInterval = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (ws.isAlive === false) {
      handleDisconnect(ws);
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping();
  });
}, HEARTBEAT_INTERVAL);

// 消息处理函数
async function handleMessage(ws, data) {
  switch (data.type) {
    case 'login':
      await handleLogin(ws, data);
      break;
    case 'chat':
      await handleChat(ws, data);
      break;
    case 'group_chat':
      await handleGroupChat(ws, data);
      break;
    case 'heartbeat':
      ws.send(JSON.stringify({ type: 'heartbeat_ack' }));
      break;
    default:
      ws.send(JSON.stringify({
        type: 'error',
        message: `未知消息类型: ${data.type}`
      }));
  }
}

// 登录处理
async function handleLogin(ws, data) {
  const { userId, token } = data;

  // 验证 token(实际应调用认证服务)
  const userInfo = await verifyToken(token);
  if (!userInfo) {
    ws.send(JSON.stringify({
      type: 'login_failed',
      message: '认证失败'
    }));
    return;
  }

  // 保存用户连接
  ws.userId = userId;
  ws.userInfo = userInfo;

  if (!userSockets.has(userId)) {
    userSockets.set(userId, new Set());
  }
  userSockets.get(userId).add(ws);
  clients.set(ws, { userId, userInfo });

  // 更新用户在线状态
  await updateUserOnlineStatus(userId, true);

  // 发送登录成功消息
  ws.send(JSON.stringify({
    type: 'login_success',
    userId: userId,
    userInfo: userInfo
  }));

  // 通知好友上线
  await notifyFriendsOnline(userId);
}

2. 消息管理与存储

消息的存储和离线消息处理:

// 消息服务
class MessageService {
  constructor(db, redis) {
    this.db = db;
    this.redis = redis;
  }

  // 发送消息
  async sendMessage(msg) {
    const message = {
      id: this.generateMessageId(),
      from: msg.from,
      to: msg.to,
      content: msg.content,
      type: msg.type || 'text',
      timestamp: Date.now(),
      status: 'pending'
    };

    // 1. 检查接收者是否在线
    const recipientSockets = userSockets.get(msg.to);

    if (recipientSockets && recipientSockets.size > 0) {
      // 在线,直接发送
      message.status = 'delivered';
      this.broadcastToUser(msg.to, {
        type: 'new_message',
        message: message
      });
    } else {
      // 不在线,存入离线消息队列
      await this.queueOfflineMessage(msg.to, message);
      message.status = 'offline';
    }

    // 2. 持久化消息
    await this.persistMessage(message);

    // 3. 消息已读回执
    if (msg.requireReceipt) {
      // 等待接收者确认
      await this.waitForReceipt(message.id, 30000);
    }

    return message;
  }

  // 群发消息
  async sendGroupMessage(groupId, msg) {
    // 获取群成员
    const members = await this.getGroupMembers(groupId);

    const results = [];
    for (const memberId of members) {
      if (memberId === msg.from) continue; // 跳过发送者
      const result = await this.sendMessage({
        ...msg,
        to: memberId,
        groupId
      });
      results.push(result);
    }

    return results;
  }

  // 离线消息队列
  async queueOfflineMessage(userId, message) {
    const queueKey = `offline:${userId}`;
    await this.redis.rpush(queueKey, JSON.stringify(message));
    // 设置过期时间 7 天
    await this.redis.expire(queueKey, 7 * 24 * 3600);
  }

  // 获取离线消息
  async getOfflineMessages(userId) {
    const queueKey = `offline:${userId}`;
    const messages = await this.redis.lrange(queueKey, 0, -1);

    if (messages.length > 0) {
      // 清除离线消息
      await this.redis.del(queueKey);
    }

    return messages.map(m => JSON.parse(m));
  }

  // 消息持久化
  async persistMessage(message) {
    await this.db.collection('messages').insertOne({
      ...message,
      createdAt: new Date()
    });
  }

  // 消息历史查询
  async getMessageHistory(userId, contactId, options = {}) {
    const { limit = 50, beforeTime } = options;

    const query = {
      $or: [
        { from: userId, to: contactId },
        { from: contactId, to: userId }
      ]
    };

    if (beforeTime) {
      query.timestamp = { $lt: beforeTime };
    }

    return await this.db.collection('messages')
      .find(query)
      .sort({ timestamp: -1 })
      .limit(limit)
      .toArray();
  }

  // 消息已读
  async markAsRead(messageId, userId) {
    await this.db.collection('messages').updateOne(
      { id: messageId, to: userId },
      { $set: { read: true, readAt: new Date() } }
    );
  }
}

3. 在线状态管理

用户在线状态和好友上下线通知:

// 在线状态服务
class PresenceService {
  constructor(redis) {
    this.redis = redis;
  }

  // 用户上线
  async userOnline(userId, deviceInfo = {}) {
    const presenceKey = `presence:${userId}`;
    const now = Date.now();

    // 记录在线状态
    await this.redis.hset(presenceKey, {
      status: 'online',
      lastSeen: now,
      device: JSON.stringify(deviceInfo)
    });

    // 添加到在线列表
    await this.redis.zadd('online_users', now, userId);

    // 设置离线定时器(断连后更新状态)
    await this.redis.expire(presenceKey, 65); // 略长于客户端超时
  }

  // 用户离线
  async userOffline(userId) {
    const presenceKey = `presence:${userId}`;
    const now = Date.now();

    // 更新离线状态
    await this.redis.hset(presenceKey, {
      status: 'offline',
      lastSeen: now
    });

    // 从在线列表移除
    await this.redis.zrem('online_users', userId);
  }

  // 获取用户状态
  async getUserStatus(userId) {
    const presenceKey = `presence:${userId}`;
    const data = await this.redis.hgetall(presenceKey);

    if (!data || !data.status) {
      return { status: 'offline' };
    }

    return {
      status: data.status,
      lastSeen: parseInt(data.lastSeen),
      device: data.device ? JSON.parse(data.device) : null
    };
  }

  // 获取在线好友列表
  async getOnlineFriends(userId) {
    const friendIds = await this.getFriendIds(userId);
    const onlineUsers = await this.redis.zrange('online_users', 0, -1);
    const onlineSet = new Set(onlineUsers.map(Number));

    return friendIds.filter(id => onlineSet.has(id));
  }

  // 批量获取用户状态
  async getBatchUserStatus(userIds) {
    const results = {};
    for (const userId of userIds) {
      results[userId] = await this.getUserStatus(userId);
    }
    return results;
  }

  // 通知好友上线/离线
  async notifyFriends(userId, event) {
    const friendIds = await this.getFriendIds(userId);

    for (const friendId of friendIds) {
      const friendSockets = userSockets.get(friendId);
      if (friendSockets) {
        const message = {
          type: 'friend_presence',
          userId: userId,
          event: event // 'online' or 'offline'
        };
        this.broadcastToUser(friendId, message);
      }
    }
  }
}

4. 消息可靠性保障

确保消息可靠送达:

// 消息可靠性保障
class MessageReliability {
  constructor(messageService) {
    this.pendingMessages = new Map(); // messageId -> { resolve, reject, timer }
  }

  // 发送消息并等待确认
  async sendWithAck(ws, message, ackTimeout = 10000) {
    return new Promise((resolve, reject) => {
      const messageId = message.id || this.generateId();

      // 设置确认超时
      const timer = setTimeout(() => {
        this.pendingMessages.delete(messageId);
        reject(new Error('消息确认超时'));
      }, ackTimeout);

      this.pendingMessages.set(messageId, { resolve, reject, timer });

      // 发送消息
      ws.send(JSON.stringify({
        ...message,
        id: messageId,
        requireAck: true
      }));
    });
  }

  // 处理消息确认
  handleAck(messageId, success) {
    const pending = this.pendingMessages.get(messageId);
    if (!pending) return;

    clearTimeout(pending.timer);
    this.pendingMessages.delete(messageId);

    if (success) {
      pending.resolve({ success: true, messageId });
    } else {
      pending.reject(new Error('消息发送失败'));
    }
  }

  // 消息重发机制
  async retryFailedMessages(ws) {
    const failedMessages = await this.getFailedMessages();

    for (const msg of failedMessages) {
      const retryCount = msg.retryCount || 0;
      if (retryCount >= 3) {
        // 超过重试次数,标记失败
        await this.markMessageFailed(msg.id);
        continue;
      }

      // 重试发送
      ws.send(JSON.stringify({
        ...msg,
        retryCount: retryCount + 1,
        isRetry: true
      }));

      // 等待确认
      await this.sleep(2000);
    }
  }
}

性能优化策略

高并发 WebSocket 性能优化:

策略 实现方式 效果
连接复用 使用 HTTP/2 多路复用 减少 TCP 连接数
消息压缩 启用 WebSocket 压缩扩展 带宽降低 70%
分布式部署 使用 Redis Pub/Sub 支持水平扩展
消息队列 引入 Kafka/RabbitMQ 峰值流量削峰

总结

WebSocket 为 Koa.js 应用提供了强大的实时通信能力。在实际开发中,需要注意连接管理、心跳检测、消息可靠性、水平扩展等问题。通过合理的架构设计和技术选型,可以构建稳定、高效的 IM 系统。

← 下一篇:Koa.js API文档自动生成与Swagger集成