p2p网站建设石家庄,东莞网络营销策划培训,随州抖音seo收费标准,国家企业信用公示信息年报全国1.场景
如何保证“本地数据库插入”与“调用第三方接口”这两个操作的原子性(要么都成功#xff0c;要么都失败)#xff0c;这是一个非常经典且常见的分布式事务场景。
2.方案一#xff1a;调整顺序 本地事务(适用于轻量级、对即时性要求不高的场景)
这是最简单且最推荐的方…1.场景如何保证“本地数据库插入”与“调用第三方接口”这两个操作的原子性(要么都成功要么都失败)这是一个非常经典且常见的分布式事务场景。2.方案一调整顺序 本地事务(适用于轻量级、对即时性要求不高的场景)这是最简单且最推荐的方案核心思想是先落库再同步。1.逻辑设计1.在本地数据库表中增加一个状态字段(例如 sync_status0-未同步1-已同步)。2.开启本地事务 Transactional。3.先将数据插入本地数据库标记状态为“未同步”。4.提交本地事务(此时数据已入库但未同步)。5.在事务提交后(或通过异步线程/消息队列)调用第三方接口。6.如果第三方调用成功更新本地数据库状态为“已同步”。2.异常处理(补偿机制)定时任务 编写一个定时任务(Scheduled Task)定期扫描数据库中状态为“未同步”且创建时间超过一定阈值的数据重新发起同步请求。优点 即使第三方接口挂了本地数据也不会丢失保证了最终一致性。缺点 存在短暂的数据不一致。示例代码// 1. 本地保存(事务内) Transactional(rollbackFor Exception.class) public void saveLocal(Data data) { data.setSyncStatus(UN_SYNC); mapper.insert(data); } // 2. 主业务逻辑 public void saveAndSync(Data data) { // 第一步先落库(保证本地数据安全) saveLocal(data); // 第二步尝试同步(即使这里失败了也不会回滚上面的saveLocal) try { boolean result thirdPartyClient.call(data); if (result) { // 同步成功更新状态 mapper.updateStatus(data.getId(), SYNCED); } } catch (Exception e) { log.error(同步失败等待定时任务补偿, e); // 这里吞掉异常不要影响主流程返回成功 } } // 3. 另外编写一个定时任务 (例如每5分钟执行一次) Scheduled(cron 0 0/5 * * * ?) public void retrySyncTask() { ListData pendingData mapper.selectUnSyncData(); for (Data data : pendingData) { // 重新调用第三方成功后更新状态 // 建议设置最大重试次数超过后人工介入 } }3.方案二利用消息队列(MQ)实现最终一致性(适用于高并发场景)如果您的系统已经引入了消息队列(如 RabbitMQ, RocketMQ, Kafka)可以使用可靠消息服务。1.逻辑设计本地事务内 插入业务数据同时插入一条“待发送消息”记录到一张本地消息表(Local Message Table)这两步在同一个数据库事务中保证百分百成功。异步发送 这一步有多种实现方式方式A(轮询) 定时任务扫描本地消息表投递到 MQ。方式B(事务消息) 如果使用 RocketMQ可以直接利用其“事务消息”特性。消费端 消费者监听 MQ收到消息后调用第三方接口。确认机制 只有第三方接口调用成功才确认消费消息(ACK)如果失败MQ 会重试。2.优点 解耦了业务逻辑和第三方调用系统吞吐量高。3.注意 消费者端需要做好幂等性处理(防止第三方接口被重复调用)。示例代码1.核心业务类(生产端)这里是关键业务数据入库和本地消息入库必须在同一个 Transactional 事务中。Service public class UserService { Autowired private UserMapper userMapper; Autowired private LocalMessageMapper messageMapper; Autowired private RabbitMQService rabbitMQService; // 封装的MQ发送服务 /** * 注册用户并触发同步 */ Transactional(rollbackFor Exception.class) // 开启本地事务 public void registerUser(User user) { // 1. 插入业务数据 userMapper.insert(user); // 2. 组装消息内容 String msgId UUID.randomUUID().toString(); UserSyncMsg msgContent new UserSyncMsg(user.getId(), user.getName()); String json JSON.toJSONString(msgContent); // 3. 插入本地消息表 (状态为 0-待发送) // 这步非常关键它保证了如果数据库回滚消息记录也会回滚如果提交消息记录一定存在。 LocalMessage localMsg new LocalMessage(); localMsg.setId(msgId); localMsg.setMsgContent(json); localMsg.setExchange(user.exchange); localMsg.setRoutingKey(user.sync.crm); localMsg.setStatus(0); messageMapper.insert(localMsg); // 4. 发送MQ消息 (这步其实可以异步或者放在事务提交后的回调中这里为了简单直接调用) // 注意即使这里发MQ失败报错也不要抛出异常导致事务回滚。 // 因为我们有定时任务兜底(见第4步)。 try { rabbitMQService.send(msgId, json); // 发送成功更新本地消息状态为 1-已发送 messageMapper.updateStatus(msgId, 1); } catch (Exception e) { log.error(MQ发送失败等待定时任务补偿: {}, msgId); // 这里吞掉异常不要影响主业务注册成功 } } }2.消费者监听 (Consumer)消费者负责调用第三方接口。如果失败利用 MQ 的重试机制或死信队列。Component RabbitListener(queues user.sync.queue) public class UserSyncConsumer { Autowired private ThirdPartyCrmClient crmClient; RabbitHandler public void process(String msgJson, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) { UserSyncMsg msg JSON.parseObject(msgJson, UserSyncMsg.class); try { // 1. 幂等性检查 (非常重要) // 调用三方接口前先查一下三方或者本地Redis确保这个userId没有被同步过。 // if (isSynced(msg.getUserId())) { channel.basicAck(tag, false); return; } // 2. 调用第三方接口 boolean success crmClient.syncUserToCrm(msg); if (success) { // 3. 成功手动确认消息 (ACK) channel.basicAck(tag, false); log.info(同步第三方成功: {}, msg.getUserId()); } else { // 4. 业务逻辑失败(比如参数校验不过)通常不再重试或者进入死信队列人工处理 log.error(第三方返回失败); channel.basicNack(tag, false, false); // 不重回队列转入死信或丢弃 } } catch (Exception e) { // 5. 网络抖动等异常拒绝消息并重回队列 (Requeue true) // 这样MQ过一会会再次推送这条消息 try { // 也可以结合重试次数判断如果重试太多次就丢进死信队列 channel.basicNack(tag, false, true); } catch (IOException ex) { ex.printStackTrace(); } } } }3.兜底定时任务 (补偿机制)这是“最终一致性”的保障。防止第2步中 rabbitMQService.send 失败(例如MQ挂了)导致本地消息表一直是“待发送”状态。Component public class MessageResendTask { Autowired private LocalMessageMapper messageMapper; Autowired private RabbitMQService rabbitMQService; // 每分钟扫描一次状态为 0 (待发送) 且创建时间超过1分钟的消息 Scheduled(fixedRate 60000) public void resendFailedMessages() { ListLocalMessage failedMsgs messageMapper.selectPendingMessages(); for (LocalMessage msg : failedMsgs) { if (msg.getRetryCount() 5) { // 超过最大重试次数标记为失败报警人工介入 messageMapper.updateStatus(msg.getId(), 2); continue; } try { rabbitMQService.send(msg.getId(), msg.getMsgContent()); // 发送成功更新状态 messageMapper.updateStatus(msg.getId(), 1); } catch (Exception e) { // 再次失败增加重试次数 messageMapper.incrementRetryCount(msg.getId()); } } } }4.方案总结这个 Demo 实现了以下逻辑闭环1.原子性 用户数据和消息记录在同一个数据库事务中同生共死。2.可靠性 即使第一遍发 MQ 失败定时任务会扫描本地消息表进行补发。3.最终一致性 消费者拿到消息后不断重试调用第三方直到成功(或进入死信队列)。4.解耦 注册操作极快不需要等待第三方接口响应。关键点提示消费者幂等性 第三方接口可能会被重复调用(比如消费者处理完了提交 ACK 时网络断了MQ 以为没成功又发了一遍)所以消费者内部或者第三方接口必须能处理重复请求。4.方案三最大努力通知(Best Effort Notification)如果您不想引入复杂的 MQ 或定时任务可以在代码层面做简单的“重试”。1.逻辑设计开启本地事务。插入数据库。注意 此时不要提交事务。调用第三方接口。如果调用成功 提交本地事务。如果调用失败 抛出异常回滚本地事务(此时本地数据也就没了)。2.重大缺陷长事务问题 网络请求耗时不可控会导致数据库连接被长时间占用严重影响数据库性能。“假失败”问题 如果第三方接口已经处理成功但返回响应时网络超时你的代码会认为失败并回滚本地数据导致第三方有数据而本地没有造成严重的数据不一致。因此 强烈不建议在 Transactional 事务代码块内部直接进行 HTTP/RPC 网络请求。5.方案四Seata 等分布式事务框架(TCC 模式)如果业务场景非常严格要求强一致性(几乎同时成功或失败)可以使用分布式事务框架(如 Alibaba Seata)。1.逻辑设计(TCC模式)Try 预留资源(本地数据插入中间状态)。Confirm 确认提交(本地更新状态调用第三方接口正式处理)。Cancel 回滚(删除本地数据调用第三方接口的“取消/回滚”方法)。2.难点 这要求第三方接口必须配合您提供 Try, Confirm, Cancel 三个配套接口。大多数第三方系统并不支持这种模式。6.总结建议综合考虑开发成本和系统稳定性方案一(本地消息表定时任务补偿) 是性价比最高的选择。