Koa.js 分布式事务解决方案与实践
分布式事务概述
在微服务架构下,不同服务拥有独立的数据库,传统的单机事务不再适用。分布式事务需要解决数据一致性问题,本文介绍 Koa.js 项目中常用的分布式事务解决方案,包括 TCC、Saga、可靠消息等模式。
事务解决方案对比
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC | 强一致 | 低 | 中 | 单机数据库 |
| TCC | 最终一致 | 高 | 高 | 业务可控 |
| Saga | 最终一致 | 高 | 中 | 长流程 |
| 可靠消息 | 最终一致 | 中 | 低 | 异步调用 |
TCC 模式实现
TCC(Try-Confirm-Cancel)模式将业务逻辑拆分为三个阶段:
// TCC 事务管理器
class TCCTransactionManager {
constructor() {
this.transactions = new Map();
this.sagaLock = require('redis-lock')(redisClient);
}
// 开启 TCC 事务
async beginTCC(transactionId, participants) {
const transaction = {
id: transactionId,
status: 'pending',
participants: participants.map(p => ({
service: p.service,
tryUrl: p.tryUrl,
confirmUrl: p.confirmUrl,
CancelUrl: p.cancelUrl,
payload: p.payload,
status: 'pending'
})),
startTime: new Date()
};
this.transactions.set(transactionId, transaction);
// 分布式锁,防止重复提交
const lock = await this.sagaLock.acquire(`tcc:${transactionId}`, 5000);
try {
return await this.executeTCC(transactionId);
} finally {
await this.sagaLock.release(lock);
}
}
// 执行 TCC 事务
async executeTCC(transactionId) {
const transaction = this.transactions.get(transactionId);
// 第一阶段:Try 执行所有参与者的预留资源
const tryResults = [];
for (const participant of transaction.participants) {
try {
const result = await this.callService(participant.tryUrl, {
transId: transactionId,
payload: participant.payload
});
participant.status = 'confirmed';
tryResults.push({ service: participant.service, success: true, result });
} catch (error) {
// Try 阶段失败,标记参与者状态
participant.status = 'failed';
tryResults.push({ service: participant.service, success: false, error: error.message });
// 回滚已成功的 Try 操作
await this.rollbackTCC(transaction, tryResults);
transaction.status = 'rollback';
throw error;
}
}
transaction.status = 'confirming';
// 第二阶段:Confirm 确认执行
const confirmResults = [];
for (const participant of transaction.participants) {
if (participant.status !== 'confirmed') continue;
try {
await this.callService(participant.confirmUrl, {
transId: transactionId,
payload: participant.payload
});
participant.status = 'completed';
confirmResults.push({ service: participant.service, success: true });
} catch (error) {
// Confirm 失败需要人工介入或重试
participant.status = 'confirm_failed';
confirmResults.push({ service: participant.service, success: false, error: error.message });
// 记录待重试的任务
await this.scheduleRetry('confirm', transactionId, participant);
}
}
transaction.status = 'completed';
return { transactionId, status: 'success', results: confirmResults };
}
// TCC 回滚
async rollbackTCC(transaction, tryResults) {
const successfulParticipants = transaction.participants
.filter((p, i) => tryResults[i]?.success);
for (const participant of successfulParticipants) {
try {
await this.callService(participant.cancelUrl, {
transId: transaction.id,
payload: participant.payload
});
participant.status = 'cancelled';
} catch (error) {
// 记录回滚失败,等待补偿
participant.status = 'cancel_failed';
await this.scheduleRetry('cancel', transaction.id, participant);
}
}
}
// 调用远程服务
async callService(url, data) {
const response = await ctx.axios.post(url, data, {
timeout: 10000,
headers: { 'X-Trans-Id': data.transId }
});
return response.data;
}
// 重试补偿任务
async scheduleRetry(type, transactionId, participant) {
const retryTask = {
type,
transactionId,
participant,
retryCount: 0,
maxRetries: 5,
nextRetryTime: new Date()
};
await this.retryTaskModel.create(retryTask);
}
}
// 使用装饰器定义 TCC 接口
const tccParticipant = (serviceName, tryUrl, confirmUrl, cancelUrl) => {
return (target, propertyKey, descriptor) => {
const originalMethod = descriptor.value;
descriptor.value = async function(ctx) {
// 获取事务管理器
const tccManager = ctx.tccManager;
// 注册参与者
const participant = {
service: serviceName,
tryUrl,
confirmUrl,
cancelUrl,
payload: ctx.request.body
};
// 开启分布式事务
const transId = ctx.get('X-Trans-Id') || uuidv4();
ctx.set('X-Trans-Id', transId);
return tccManager.addParticipant(transId, participant);
};
return descriptor;
};
};
Saga 模式实现
Saga 模式将长流程拆分为多个本地事务,通过补偿机制保证最终一致:
// Saga 编排器
class SagaOrchestrator {
constructor(sagaModel, compensationService) {
this.Saga = sagaModel;
this.Compensation = compensationService;
}
// 执行 Saga 流程
async execute(sagaDefinition, initialPayload) {
const sagaId = uuidv4();
const saga = await this.Saga.create({
id: sagaId,
name: sagaDefinition.name,
status: 'running',
currentStep: 0,
payload: initialPayload,
completedSteps: [],
startedAt: new Date()
});
const steps = sagaDefinition.steps;
let currentPayload = initialPayload;
try {
for (let i = 0; i < steps.length; i++) {
saga.currentStep = i;
// 执行当前步骤
const step = steps[i];
const stepResult = await this.executeStep(sagaId, step, currentPayload);
// 记录步骤执行结果
await this.Saga.updateOne(
{ id: sagaId },
{
$push: {
completedSteps: {
stepName: step.name,
status: 'success',
result: stepResult,
completedAt: new Date()
}
}
}
);
// 将上一步的输出作为下一步的输入
if (stepResult && step.outputMapper) {
currentPayload = step.outputMapper(currentPayload, stepResult);
}
}
// 所有步骤执行完成
await this.Saga.updateOne(
{ id: sagaId },
{ status: 'completed', completedAt: new Date() }
);
return { sagaId, status: 'success', payload: currentPayload };
} catch (error) {
// 执行补偿
await this.compensate(sagaId, saga.completedSteps);
await this.Saga.updateOne(
{ id: sagaId },
{ status: 'compensated', error: error.message, compensatedAt: new Date() }
);
throw error;
}
}
// 执行单个步骤
async executeStep(sagaId, step, payload) {
// 调用远程服务
const response = await ctx.axios.post(step.serviceUrl, {
sagaId,
...payload
});
return response.data;
}
// 执行补偿
async compensate(sagaId, completedSteps) {
// 倒序执行已成功步骤的补偿操作
const reverseSteps = [...completedSteps].reverse();
for (const step of reverseSteps) {
if (!step.compensationUrl) continue;
try {
await ctx.axios.post(step.compensationUrl, {
sagaId,
originalResult: step.result
});
console.log(`Compensated step: ${step.stepName}`);
} catch (error) {
// 记录补偿失败
await this.Compensation.create({
sagaId,
stepName: step.stepName,
compensationUrl: step.compensationUrl,
status: 'failed',
error: error.message,
retryCount: 0
});
}
}
}
}
// Saga 定义示例
const orderSagaDefinition = {
name: 'create-order-saga',
steps: [
{
name: 'create-order',
serviceUrl: '/api/orders/create',
compensationUrl: '/api/orders/cancel',
outputMapper: (payload, result) => ({ ...payload, orderId: result.orderId })
},
{
name: 'reserve-stock',
serviceUrl: '/api/inventory/reserve',
compensationUrl: '/api/inventory/release',
outputMapper: (payload, result) => ({ ...payload, reservationId: result.reservationId })
},
{
name: 'process-payment',
serviceUrl: '/api/payment/charge',
compensationUrl: '/api/payment/refund',
outputMapper: (payload, result) => ({ ...payload, paymentId: result.paymentId })
},
{
name: 'send-notification',
serviceUrl: '/api/notification/send',
// 最后一步没有补偿
compensationUrl: null
}
]
};
可靠消息方案
通过消息队列实现最终一致性:
// 可靠消息服务
class ReliableMessageService {
constructor(messageQueue, localTransaction) {
this.mq = messageQueue;
this.Transaction = localTransaction;
}
// 发送可靠消息(本地事务+消息)
async sendReliableMessage(topic, message, localTransactionCallback) {
const messageId = uuidv4();
const messageRecord = {
id: messageId,
topic,
content: JSON.stringify(message),
status: 'pending',
createdAt: new Date(),
retryCount: 0
};
// 开启本地事务
const transaction = await this.Transaction.beginTransaction();
try {
// 1. 保存消息记录到本地数据库
await this.MessageModel.create(messageRecord);
// 2. 执行本地业务逻辑
await localTransactionCallback(transaction);
// 3. 提交本地事务
await transaction.commit();
// 4. 发送消息到消息队列
await this.mq.send(topic, {
messageId,
...message
});
// 5. 更新消息状态为已发送
await this.MessageModel.updateOne(
{ id: messageId },
{ status: 'sent', sentAt: new Date() }
);
return { messageId, status: 'success' };
} catch (error) {
// 回滚本地事务
await transaction.rollback();
// 消息状态保持 pending,等待重试
await this.MessageModel.updateOne(
{ id: messageId },
{ status: 'failed', error: error.message }
);
throw error;
}
}
// 消费者处理消息
async consumeMessage(topic, handler) {
await this.mq.subscribe(topic, async (message) => {
const messageId = message.messageId;
// 检查消息是否已处理(幂等性)
const existingMessage = await this.MessageModel.findOne({ id: messageId });
if (existingMessage?.status === 'consumed') {
console.log(`Message ${messageId} already consumed, skip`);
return;
}
// 标记消息为处理中
await this.MessageModel.updateOne(
{ id: messageId },
{ status: 'processing', processedAt: new Date() }
);
try {
// 执行业务处理
await handler(message);
// 处理成功,更新状态
await this.MessageModel.updateOne(
{ id: messageId },
{ status: 'consumed', consumedAt: new Date() }
);
} catch (error) {
// 处理失败,记录错误并重试
await this.MessageModel.updateOne(
{ id: messageId },
{
status: 'consume_failed',
error: error.message,
$inc: { retryCount: 1 }
}
);
// 根据重试次数决定策略
const msg = await this.MessageModel.findOne({ id: messageId });
if (msg.retryCount < 3) {
// 重试
await this.mq.republish(topic, message);
} else {
// 进入死信队列
await this.mq.sendDeadLetter(topic, message);
}
throw error;
}
});
}
// 消息重试机制
async retryPendingMessages(options = {}) {
const { maxRetries = 5, batchSize = 100 } = options;
const pendingMessages = await this.MessageModel.find({
status: { $in: ['pending', 'failed', 'consume_failed'] },
retryCount: { $lt: maxRetries }
}).limit(batchSize);
for (const message of pendingMessages) {
try {
if (message.status === 'pending' || message.status === 'failed') {
// 重新发送消息
await this.mq.send(message.topic, JSON.parse(message.content));
} else if (message.status === 'consume_failed') {
// 重新消费
await this.mq.republish(message.topic, JSON.parse(message.content));
}
await this.MessageModel.updateOne(
{ id: message.id },
{
status: 'sent',
retryCount: message.retryCount + 1,
lastRetryAt: new Date()
}
);
} catch (error) {
console.error(`Retry message ${message.id} failed:`, error);
}
}
return { processed: pendingMessages.length };
}
}
总结
分布式事务是微服务架构的核心挑战,选择合适的事务方案至关重要:
- TCC 模式:性能高,适合业务可控的场景,需要改造业务接口
- Saga 模式:适合长流程业务,通过补偿实现最终一致
- 可靠消息:适合异步解耦场景,实现简单,注意幂等性
- Seata 框架:提供 AT、TCC、Saga 等多种模式,一站式解决方案
在实际项目中,需要根据业务特点和技术团队能力选择合适的方案。