KOA技术分享

专注 Koa.js 框架开发

Koa.js 分布式事务解决方案与实践

分布式事务概述

在微服务架构下,不同服务拥有独立的数据库,传统的单机事务不再适用。分布式事务需要解决数据一致性问题,本文介绍 Koa.js 项目中常用的分布式事务解决方案,包括 TCC、Saga、可靠消息等模式。

事务解决方案对比

方案 一致性 性能 复杂度 适用场景
2PC 强一致 单机数据库
TCC 最终一致 业务可控
Saga 最终一致 长流程
可靠消息 最终一致 异步调用

TCC 模式实现

TCC(Try-Confirm-Cancel)模式将业务逻辑拆分为三个阶段:

// TCC 事务管理器
class TCCTransactionManager {
  constructor() {
    this.transactions = new Map();
    this.sagaLock = require('redis-lock')(redisClient);
  }

  // 开启 TCC 事务
  async beginTCC(transactionId, participants) {
    const transaction = {
      id: transactionId,
      status: 'pending',
      participants: participants.map(p => ({
        service: p.service,
        tryUrl: p.tryUrl,
        confirmUrl: p.confirmUrl,
       CancelUrl: p.cancelUrl,
        payload: p.payload,
        status: 'pending'
      })),
      startTime: new Date()
    };

    this.transactions.set(transactionId, transaction);

    // 分布式锁,防止重复提交
    const lock = await this.sagaLock.acquire(`tcc:${transactionId}`, 5000);
    try {
      return await this.executeTCC(transactionId);
    } finally {
      await this.sagaLock.release(lock);
    }
  }

  // 执行 TCC 事务
  async executeTCC(transactionId) {
    const transaction = this.transactions.get(transactionId);

    // 第一阶段:Try 执行所有参与者的预留资源
    const tryResults = [];
    for (const participant of transaction.participants) {
      try {
        const result = await this.callService(participant.tryUrl, {
          transId: transactionId,
          payload: participant.payload
        });

        participant.status = 'confirmed';
        tryResults.push({ service: participant.service, success: true, result });
      } catch (error) {
        // Try 阶段失败,标记参与者状态
        participant.status = 'failed';
        tryResults.push({ service: participant.service, success: false, error: error.message });

        // 回滚已成功的 Try 操作
        await this.rollbackTCC(transaction, tryResults);
        transaction.status = 'rollback';
        throw error;
      }
    }

    transaction.status = 'confirming';

    // 第二阶段:Confirm 确认执行
    const confirmResults = [];
    for (const participant of transaction.participants) {
      if (participant.status !== 'confirmed') continue;

      try {
        await this.callService(participant.confirmUrl, {
          transId: transactionId,
          payload: participant.payload
        });

        participant.status = 'completed';
        confirmResults.push({ service: participant.service, success: true });
      } catch (error) {
        // Confirm 失败需要人工介入或重试
        participant.status = 'confirm_failed';
        confirmResults.push({ service: participant.service, success: false, error: error.message });

        // 记录待重试的任务
        await this.scheduleRetry('confirm', transactionId, participant);
      }
    }

    transaction.status = 'completed';
    return { transactionId, status: 'success', results: confirmResults };
  }

  // TCC 回滚
  async rollbackTCC(transaction, tryResults) {
    const successfulParticipants = transaction.participants
      .filter((p, i) => tryResults[i]?.success);

    for (const participant of successfulParticipants) {
      try {
        await this.callService(participant.cancelUrl, {
          transId: transaction.id,
          payload: participant.payload
        });

        participant.status = 'cancelled';
      } catch (error) {
        // 记录回滚失败,等待补偿
        participant.status = 'cancel_failed';
        await this.scheduleRetry('cancel', transaction.id, participant);
      }
    }
  }

  // 调用远程服务
  async callService(url, data) {
    const response = await ctx.axios.post(url, data, {
      timeout: 10000,
      headers: { 'X-Trans-Id': data.transId }
    });
    return response.data;
  }

  // 重试补偿任务
  async scheduleRetry(type, transactionId, participant) {
    const retryTask = {
      type,
      transactionId,
      participant,
      retryCount: 0,
      maxRetries: 5,
      nextRetryTime: new Date()
    };

    await this.retryTaskModel.create(retryTask);
  }
}

// 使用装饰器定义 TCC 接口
const tccParticipant = (serviceName, tryUrl, confirmUrl, cancelUrl) => {
  return (target, propertyKey, descriptor) => {
    const originalMethod = descriptor.value;

    descriptor.value = async function(ctx) {
      // 获取事务管理器
      const tccManager = ctx.tccManager;

      // 注册参与者
      const participant = {
        service: serviceName,
        tryUrl,
        confirmUrl,
        cancelUrl,
        payload: ctx.request.body
      };

      // 开启分布式事务
      const transId = ctx.get('X-Trans-Id') || uuidv4();
      ctx.set('X-Trans-Id', transId);

      return tccManager.addParticipant(transId, participant);
    };

    return descriptor;
  };
};

Saga 模式实现

Saga 模式将长流程拆分为多个本地事务,通过补偿机制保证最终一致:

// Saga 编排器
class SagaOrchestrator {
  constructor(sagaModel, compensationService) {
    this.Saga = sagaModel;
    this.Compensation = compensationService;
  }

  // 执行 Saga 流程
  async execute(sagaDefinition, initialPayload) {
    const sagaId = uuidv4();
    const saga = await this.Saga.create({
      id: sagaId,
      name: sagaDefinition.name,
      status: 'running',
      currentStep: 0,
      payload: initialPayload,
      completedSteps: [],
      startedAt: new Date()
    });

    const steps = sagaDefinition.steps;
    let currentPayload = initialPayload;

    try {
      for (let i = 0; i < steps.length; i++) {
        saga.currentStep = i;

        // 执行当前步骤
        const step = steps[i];
        const stepResult = await this.executeStep(sagaId, step, currentPayload);

        // 记录步骤执行结果
        await this.Saga.updateOne(
          { id: sagaId },
          {
            $push: {
              completedSteps: {
                stepName: step.name,
                status: 'success',
                result: stepResult,
                completedAt: new Date()
              }
            }
          }
        );

        // 将上一步的输出作为下一步的输入
        if (stepResult && step.outputMapper) {
          currentPayload = step.outputMapper(currentPayload, stepResult);
        }
      }

      // 所有步骤执行完成
      await this.Saga.updateOne(
        { id: sagaId },
        { status: 'completed', completedAt: new Date() }
      );

      return { sagaId, status: 'success', payload: currentPayload };

    } catch (error) {
      // 执行补偿
      await this.compensate(sagaId, saga.completedSteps);

      await this.Saga.updateOne(
        { id: sagaId },
        { status: 'compensated', error: error.message, compensatedAt: new Date() }
      );

      throw error;
    }
  }

  // 执行单个步骤
  async executeStep(sagaId, step, payload) {
    // 调用远程服务
    const response = await ctx.axios.post(step.serviceUrl, {
      sagaId,
      ...payload
    });

    return response.data;
  }

  // 执行补偿
  async compensate(sagaId, completedSteps) {
    // 倒序执行已成功步骤的补偿操作
    const reverseSteps = [...completedSteps].reverse();

    for (const step of reverseSteps) {
      if (!step.compensationUrl) continue;

      try {
        await ctx.axios.post(step.compensationUrl, {
          sagaId,
          originalResult: step.result
        });

        console.log(`Compensated step: ${step.stepName}`);
      } catch (error) {
        // 记录补偿失败
        await this.Compensation.create({
          sagaId,
          stepName: step.stepName,
          compensationUrl: step.compensationUrl,
          status: 'failed',
          error: error.message,
          retryCount: 0
        });
      }
    }
  }
}

// Saga 定义示例
const orderSagaDefinition = {
  name: 'create-order-saga',
  steps: [
    {
      name: 'create-order',
      serviceUrl: '/api/orders/create',
      compensationUrl: '/api/orders/cancel',
      outputMapper: (payload, result) => ({ ...payload, orderId: result.orderId })
    },
    {
      name: 'reserve-stock',
      serviceUrl: '/api/inventory/reserve',
      compensationUrl: '/api/inventory/release',
      outputMapper: (payload, result) => ({ ...payload, reservationId: result.reservationId })
    },
    {
      name: 'process-payment',
      serviceUrl: '/api/payment/charge',
      compensationUrl: '/api/payment/refund',
      outputMapper: (payload, result) => ({ ...payload, paymentId: result.paymentId })
    },
    {
      name: 'send-notification',
      serviceUrl: '/api/notification/send',
      // 最后一步没有补偿
      compensationUrl: null
    }
  ]
};

可靠消息方案

通过消息队列实现最终一致性:

// 可靠消息服务
class ReliableMessageService {
  constructor(messageQueue, localTransaction) {
    this.mq = messageQueue;
    this.Transaction = localTransaction;
  }

  // 发送可靠消息(本地事务+消息)
  async sendReliableMessage(topic, message, localTransactionCallback) {
    const messageId = uuidv4();
    const messageRecord = {
      id: messageId,
      topic,
      content: JSON.stringify(message),
      status: 'pending',
      createdAt: new Date(),
      retryCount: 0
    };

    // 开启本地事务
    const transaction = await this.Transaction.beginTransaction();

    try {
      // 1. 保存消息记录到本地数据库
      await this.MessageModel.create(messageRecord);

      // 2. 执行本地业务逻辑
      await localTransactionCallback(transaction);

      // 3. 提交本地事务
      await transaction.commit();

      // 4. 发送消息到消息队列
      await this.mq.send(topic, {
        messageId,
        ...message
      });

      // 5. 更新消息状态为已发送
      await this.MessageModel.updateOne(
        { id: messageId },
        { status: 'sent', sentAt: new Date() }
      );

      return { messageId, status: 'success' };

    } catch (error) {
      // 回滚本地事务
      await transaction.rollback();

      // 消息状态保持 pending,等待重试
      await this.MessageModel.updateOne(
        { id: messageId },
        { status: 'failed', error: error.message }
      );

      throw error;
    }
  }

  // 消费者处理消息
  async consumeMessage(topic, handler) {
    await this.mq.subscribe(topic, async (message) => {
      const messageId = message.messageId;

      // 检查消息是否已处理(幂等性)
      const existingMessage = await this.MessageModel.findOne({ id: messageId });
      if (existingMessage?.status === 'consumed') {
        console.log(`Message ${messageId} already consumed, skip`);
        return;
      }

      // 标记消息为处理中
      await this.MessageModel.updateOne(
        { id: messageId },
        { status: 'processing', processedAt: new Date() }
      );

      try {
        // 执行业务处理
        await handler(message);

        // 处理成功,更新状态
        await this.MessageModel.updateOne(
          { id: messageId },
          { status: 'consumed', consumedAt: new Date() }
        );

      } catch (error) {
        // 处理失败,记录错误并重试
        await this.MessageModel.updateOne(
          { id: messageId },
          {
            status: 'consume_failed',
            error: error.message,
            $inc: { retryCount: 1 }
          }
        );

        // 根据重试次数决定策略
        const msg = await this.MessageModel.findOne({ id: messageId });
        if (msg.retryCount < 3) {
          // 重试
          await this.mq.republish(topic, message);
        } else {
          // 进入死信队列
          await this.mq.sendDeadLetter(topic, message);
        }

        throw error;
      }
    });
  }

  // 消息重试机制
  async retryPendingMessages(options = {}) {
    const { maxRetries = 5, batchSize = 100 } = options;

    const pendingMessages = await this.MessageModel.find({
      status: { $in: ['pending', 'failed', 'consume_failed'] },
      retryCount: { $lt: maxRetries }
    }).limit(batchSize);

    for (const message of pendingMessages) {
      try {
        if (message.status === 'pending' || message.status === 'failed') {
          // 重新发送消息
          await this.mq.send(message.topic, JSON.parse(message.content));
        } else if (message.status === 'consume_failed') {
          // 重新消费
          await this.mq.republish(message.topic, JSON.parse(message.content));
        }

        await this.MessageModel.updateOne(
          { id: message.id },
          {
            status: 'sent',
            retryCount: message.retryCount + 1,
            lastRetryAt: new Date()
          }
        );

      } catch (error) {
        console.error(`Retry message ${message.id} failed:`, error);
      }
    }

    return { processed: pendingMessages.length };
  }
}

总结

分布式事务是微服务架构的核心挑战,选择合适的事务方案至关重要:

在实际项目中,需要根据业务特点和技术团队能力选择合适的方案。

← 下一篇:Koa.js 微服务架构设计与服务治理