news 2026/5/27 20:33:23

Spring 多线程事务:为什么回滚失效,怎么解决

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring 多线程事务:为什么回滚失效,怎么解决

@Transactional加上去,单线程没问题,一到多线程就废——部分数据入库了,部分没回滚,还不报错。我第一次碰到这个问题排查了半天,最后发现 Spring 事务是基于 ThreadLocal 的,子线程根本拿不到主线程的 Connection。

这篇文章把这个问题从头到尾讲清楚:事务怎么工作的,多线程下为什么失效,以及三种生产环境能用的方案。

一、事务是怎么跑起来的

先看一段最普通的代码:

@Service public class OrderService { @Transactional(rollbackFor = Exception.class) public void createOrder(Order order) { orderMapper.insert(order); accountMapper.deduct(order.getUserId(), order.getAmount()); } }

加了@Transactional,insert 和 deduct 就在同一个事务里了。但谁在管这个事务?不是你的 OrderService,是 Spring 在启动时给它包了一层代理。

代理对象长这样:

Client 调用 ↓ 代理对象(Proxy) ├── TransactionInterceptor │ ├── 解析 @Transactional 的属性 │ ├── 开启事务(setAutoCommit=false) │ ├── 反射调用 createOrder() │ └── commit() 或 rollback() └── 真正的 OrderService

代理创建过程

@EnableTransactionManagement这个注解会触发TransactionManagementConfigurationSelector,往容器里注册两个东西:

  • AutoProxyRegistrar:注册InfrastructureAdvisorAutoProxyCreator,负责给 Bean 创建代理
  • ProxyTransactionManagementConfiguration:注册事务切面相关 Bean

然后TransactionAttributeSourcePointcut会扫描所有标注了@Transactional的方法:

// Spring 源码:TransactionAttributeSourcePointcut public boolean matches(Method method, Class<?> targetClass) { if (this.publicMethodsOnly && !Modifier.isPublic(method.getModifiers())) { return false; // 非 public 方法直接跳过 } TransactionAttributeSource tas = this.transactionAttributeSource; return (tas == null || tas.getTransactionAttribute(method, targetClass) != null); }

最后为包含事务方法的 Bean 创建 CGLIB 代理,织入TransactionInterceptor

运行时调用链路

核心源码简化下来就这些:

protected Object invokeWithinTransaction(MethodInvocation invocation) { TransactionAttribute txAttr = tas.getTransactionAttribute(method, targetClass); PlatformTransactionManager tm = getTransactionManager(txAttr); TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, method); Object retVal; try { retVal = invocation.proceed(); // 反射调用目标方法 } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, ex); // 异常 → 回滚或提交 throw ex; } commitTransactionAfterReturning(txInfo); // 正常 → 提交 return retVal; }

最关键的东西:ThreadLocal

整个事务机制靠TransactionSynchronizationManager撑着,它用 ThreadLocal 把 Connection 绑到当前线程:

public abstract class TransactionSynchronizationManager { private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); public static void bindResource(Object key, Object value) { Map<Object, Object> map = resources.get(); if (map == null) { map = new HashMap<>(); resources.set(map); } map.put(key, value); } public static Object getResource(Object key) { Map<Object, Object> map = resources.get(); if (map == null) return null; return map.get(key); } }

每个线程有自己独立的 Connection,事务自然也是独立的。记住这点,后面全跟它有关。

二、@Transactional 的几个关键属性

传播行为

用的最多的是前三个:

传播行为干嘛的什么时候用
REQUIRED(默认)有事务就加入,没有就新建90% 的场景
REQUIRES_NEW不管有没有,都开一个独立事务操作日志、消息记录,不想被外层事务影响
NESTED嵌套事务,用的是 savepoint子操作回滚不影响外层
SUPPORTS有就加入,没有就非事务跑查询方法
NOT_SUPPORTED非事务跑,有事务就挂起不需要事务的操作
MANDATORY必须在事务里,不然抛异常强制要求事务
NEVER不能在事务里,有就抛异常不允许事务

REQUIRED、REQUIRES_NEW、NESTED 三者的区别:

REQUIRED: ┌── 外层事务 ──────────────────────────┐ │ methodA() → methodB() │ 同一个事务,任一回滚全部回滚 │ (共享同一个 Connection) │ └─────────────────────────────────────┘ REQUIRES_NEW: ┌── 外层事务 ──────────┐ │ methodA() │ │ ┌── 新事务 ────────┐│ │ │ methodB() ││ 独立事务,互不影响 │ │ (独立Connection) ││ │ └─────────────────┘│ └─────────────────────┘ NESTED: ┌── 外层事务 ──────────────────────────┐ │ methodA() │ │ ┌── Savepoint ───────────────────┐ │ │ │ methodB() │ │ B回滚不影响A │ │ (共享Connection,有savepoint) │ │ │ └────────────────────────────────┘ │ └─────────────────────────────────────┘

回滚异常

Spring 默认只对 RuntimeException 和 Error 回滚,checked exception 不管:

// DefaultTransactionAttribute public boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error); }

所以@Transactional不加rollbackFor,抛 IOException 是不会回滚的。rollbackFor = Exception.class应该成为默认写法。

执行全链路

三、多线程事务为什么废了

直接看代码:

@Service public class OrderBatchService { @Autowired private OrderMapper orderMapper; @Autowired private ThreadPoolTaskExecutor executor; @Transactional(rollbackFor = Exception.class) public void batchCreateOrders(List<Order> orders) { for (Order order : orders) { executor.submit(() -> { orderMapper.insert(order); // 子线程执行 }); } if (orders.size() > 10) { throw new RuntimeException("批量处理失败"); } } }

主线程抛异常回滚了,子线程的数据已经在库里了。跟踪一下就明白:

主线程 Thread-main 子线程 Thread-pool-1 ────────────────── ───────────────────── ThreadLocal: {ds: Conn-A} ThreadLocal: {} ← 空的 Conn-A.setAutoCommit(false) 从连接池拿 Connection-B executor.submit(...) Connection-B.autoCommit=true INSERT → 直接提交了 throw RuntimeException rollback Conn-A Conn-B 早就提交了,回天无力

子线程从自己的 ThreadLocal 里取 Connection,取到的是空的,Spring 就给它新建一个。新 Connection 的 autoCommit 是 true,SQL 执行完直接入库,主线程的事务根本管不着。

四、怎么解决

方案一:编程式事务 + CountDownLatch

最简单也最常用。每个子线程自己管自己的事务,CountDownLatch 等所有线程跑完,AtomicBoolean 标记有没有失败的。

@Service @Slf4j public class BatchOrderService { @Autowired private PlatformTransactionManager transactionManager; @Autowired private OrderMapper orderMapper; @Autowired @Qualifier("batchExecutor") private ThreadPoolTaskExecutor executor; public BatchResult batchCreateOrders(List<Order> orders) { if (CollectionUtils.isEmpty(orders)) { return BatchResult.success(); } CountDownLatch latch = new CountDownLatch(orders.size()); AtomicBoolean hasError = new AtomicBoolean(false); List<BatchResultItem> results = Collections.synchronizedList(new ArrayList<>()); for (Order order : orders) { executor.submit(() -> { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); try { orderMapper.insert(order); orderMapper.insertLog(new OrderLog(order.getId(), "创建成功")); transactionManager.commit(status); results.add(BatchResultItem.success(order.getId())); } catch (Exception e) { hasError.set(true); transactionManager.rollback(status); results.add(BatchResultItem.fail(order.getId(), e.getMessage())); log.error("订单 [{}] 创建失败", order.getId(), e); } finally { latch.countDown(); } }); } try { boolean completed = latch.await(30, TimeUnit.SECONDS); if (!completed) { throw new BusinessException("批量处理超时,部分订单未完成"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("批量处理被中断"); } if (hasError.get()) { List<String> failedIds = results.stream() .filter(r -> !r.isSuccess()) .map(BatchResultItem::getBizId) .collect(Collectors.toList()); throw new BusinessException("部分订单处理失败: " + failedIds); } return BatchResult.success(results); } }

返回结果封装:

@Data public class BatchResult { private boolean success; private List<BatchResultItem> items; private String message; public static BatchResult success() { BatchResult r = new BatchResult(); r.setSuccess(true); r.setItems(Collections.emptyList()); return r; } public static BatchResult success(List<BatchResultItem> items) { BatchResult r = new BatchResult(); r.setSuccess(true); r.setItems(items); return r; } } @Data @AllArgsConstructor public class BatchResultItem { private String bizId; private boolean success; private String message; public static BatchResultItem success(String bizId) { return new BatchResultItem(bizId, true, "成功"); } public static BatchResultItem fail(String bizId, String message) { return new BatchResultItem(bizId, false, message); } }

这个方案没法做到全回滚——已经 commit 的线程撤不回来。适合日志、通知、数据同步这种丢几条问题不大的场景。

方案二:手动管理连接,最后统一提交或回滚

思路是所有子线程先跑,跑完先不提交,等主线程统一决定。

@Service @Slf4j public class ManualConnectionService { @Autowired private DataSource dataSource; @Autowired private OrderMapper orderMapper; @Autowired @Qualifier("batchExecutor") private ThreadPoolTaskExecutor executor; public void batchWithUnifiedCommit(List<Order> orders) { CountDownLatch latch = new CountDownLatch(orders.size()); AtomicBoolean hasError = new AtomicBoolean(false); List<ConnectionHolder> holders = Collections.synchronizedList(new ArrayList<>()); for (Order order : orders) { executor.submit(() -> { Connection conn = null; try { conn = DataSourceUtils.getConnection(dataSource); conn.setAutoCommit(false); holders.add(new ConnectionHolder(conn, order.getId())); TransactionSynchronizationManager.bindResource( dataSource, new ConnectionHolder(conn)); orderMapper.insert(order); orderMapper.insertLog(new OrderLog(order.getId(), "处理中")); } catch (Exception e) { hasError.set(true); log.error("订单 [{}] 处理失败", order.getId(), e); } finally { TransactionSynchronizationManager.unbindResource(dataSource); DataSourceUtils.releaseConnection(conn, dataSource); latch.countDown(); } }); } try { latch.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("批量处理被中断"); } for (ConnectionHolder holder : holders) { try (Connection conn = holder.getConnection()) { if (hasError.get()) { conn.rollback(); } else { conn.commit(); } } catch (SQLException e) { log.error("连接操作失败: {}", holder.getBizId(), e); } } if (hasError.get()) { throw new RuntimeException("批量处理存在失败项,已全部回滚"); } } @Data @AllArgsConstructor private static class ConnectionHolder { private Connection connection; private String bizId; } }

说实话这个方案我在线上不敢用。latch.await()期间所有 Connection 都被占着,数据量一大连接池就耗尽了。只有数据量小(百条以内)且对一致性有要求的时候才考虑。

方案三:本地消息表

生产环境最推荐的方案。把批量任务拆成单条消息,主事务写入业务数据和消息记录,子任务异步消费,失败自动重试。

先建表:

CREATE TABLE batch_task_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, batch_id VARCHAR(64) NOT NULL COMMENT '批次ID', biz_id VARCHAR(64) NOT NULL COMMENT '业务ID', status VARCHAR(16) NOT NULL DEFAULT 'PENDING' COMMENT 'PENDING/PROCESSING/SUCCESS/FAILED', content TEXT NOT NULL COMMENT '任务内容(JSON)', retry_count INT NOT NULL DEFAULT 0 COMMENT '已重试次数', max_retry INT NOT NULL DEFAULT 3 COMMENT '最大重试次数', error_msg VARCHAR(512) COMMENT '错误信息', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status (status), INDEX idx_batch_id (batch_id), INDEX idx_biz_id (biz_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='批量任务消息表';

主事务写入——业务数据和消息表在同一个事务里,要么一起成功要么一起回滚:

@Service public class BatchTaskSubmitService { @Autowired private OrderMapper orderMapper; @Autowired private BatchTaskMessageMapper messageMapper; @Transactional(rollbackFor = Exception.class) public String submitBatchTask(List<Order> orders) { String batchId = IdUtil.simpleUUID(); for (Order order : orders) { order.setStatus("PENDING"); orderMapper.insert(order); BatchTaskMessage message = new BatchTaskMessage(); message.setBatchId(batchId); message.setBizId(order.getId().toString()); message.setContent(JSON.toJSONString(order)); message.setStatus("PENDING"); messageMapper.insert(message); } return batchId; } }

异步处理,定时轮询:

@Service @Slf4j public class BatchTaskProcessService { @Autowired private BatchTaskMessageMapper messageMapper; @Autowired private OrderMapper orderMapper; @Autowired private PlatformTransactionManager transactionManager; @Autowired @Qualifier("batchExecutor") private ThreadPoolTaskExecutor executor; @Scheduled(fixedDelay = 3000) public void processPendingTasks() { List<BatchTaskMessage> tasks = messageMapper.selectPendingTasks(100); if (tasks.isEmpty()) return; CountDownLatch latch = new CountDownLatch(tasks.size()); for (BatchTaskMessage task : tasks) { executor.submit(() -> { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); TransactionStatus status = transactionManager.getTransaction(def); try { messageMapper.updateStatus(task.getId(), "PROCESSING", null); Order order = JSON.parseObject(task.getContent(), Order.class); order.setStatus("COMPLETED"); orderMapper.updateById(order); messageMapper.updateStatus(task.getId(), "SUCCESS", null); transactionManager.commit(status); } catch (Exception e) { transactionManager.rollback(status); int retryCount = task.getRetryCount() + 1; if (retryCount >= task.getMaxRetry()) { messageMapper.updateStatus(task.getId(), "FAILED", e.getMessage().substring(0, Math.min(e.getMessage().length(), 512))); } else { messageMapper.updateRetryCount(task.getId(), retryCount); } } finally { latch.countDown(); } }); } try { latch.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public BatchTaskProgress getProgress(String batchId) { int total = messageMapper.countByBatchId(batchId); int success = messageMapper.countByBatchIdAndStatus(batchId, "SUCCESS"); int failed = messageMapper.countByBatchIdAndStatus(batchId, "FAILED"); int pending = total - success - failed; return new BatchTaskProgress(batchId, total, success, failed, pending); } }

进度查询:

@Data @AllArgsConstructor public class BatchTaskProgress { private String batchId; private int total; private int success; private int failed; private int pending; public boolean isCompleted() { return pending == 0; } public boolean isAllSuccess() { return total == success; } }

这个方案的好处是天然支持重试、进度可查、不阻塞连接。代价是多了一张消息表和定时任务,但换来的是可靠性。

五、方案怎么选

维度编程式事务手动管理连接本地消息表
一致性最终一致统一提交/回滚最终一致
回滚粒度仅失败线程全部连接单条任务
复杂度
性能连接被占着,中等
失败重试不支持不支持支持
进度追踪自己搞自己搞天然支持

我的建议:

  • 日志、通知、数据同步 → 方案一,够用
  • 对一致性有要求、数据量不大 → 方案二,但谨慎用
  • 业务数据操作,不能丢数据 → 方案三,生产首选
  • 跨服务跨库 → 上 Seata,但说实话大多数项目用不上

六、线程池配置

批处理线程池有个容易忽略的点:线程数不能超过数据库连接池大小。HikariCP 默认连接池才 10 个,你开 20 个线程跑批处理,一半线程在那等连接。

@Configuration public class BatchThreadPoolConfig { @Bean("batchExecutor") public ThreadPoolTaskExecutor batchExecutor() { int corePoolSize = Runtime.getRuntime().availableProcessors(); int maxPoolSize = corePoolSize * 2; ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("batch-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; } }

CallerRunsPolicy让主线程自己跑被拒绝的任务,起到限流作用。别用默认的AbortPolicy,任务被拒了直接抛异常,前面已经提交的任务不受影响,但你不知道有任务没跑。

大数据量记得分批:

public void processLargeBatch(List<Order> allOrders) { int batchSize = 50; Iterable<List<Order>> batches = Iterables.partition(allOrders, batchSize); int batchIndex = 0; for (List<Order> batch : batches) { batchIndex++; try { batchOrderService.batchCreateOrders(batch); } catch (Exception e) { log.error("第 {} 批处理失败", batchIndex, e); throw e; } } }

七、几个容易踩的坑

子线程异常被吞掉:

// 错:主线程不知道子线程挂了 executor.submit(() -> { try { orderMapper.insert(order); } catch (Exception e) { log.error("失败", e); } latch.countDown(); }); // 对:AtomicBoolean 标记 executor.submit(() -> { try { orderMapper.insert(order); } catch (Exception e) { hasError.set(true); log.error("失败", e); } finally { latch.countDown(); } });

CountDownLatch 无限等待:

// 错:可能永久阻塞 latch.await(); // 对:设超时 boolean completed = latch.await(30, TimeUnit.SECONDS); if (!completed) { throw new BusinessException("处理超时"); }

事务注解加错位置:

// 错:@Transactional 在主方法上,对子线程无效 @Transactional(rollbackFor = Exception.class) public void batchCreateOrders(List<Order> orders) { for (Order order : orders) { executor.submit(() -> orderMapper.insert(order)); } } // 对:去掉主方法的注解,子线程内用编程式事务 public void batchCreateOrders(List<Order> orders) { for (Order order : orders) { executor.submit(() -> { TransactionStatus status = transactionManager.getTransaction( new DefaultTransactionDefinition()); try { orderMapper.insert(order); transactionManager.commit(status); } catch (Exception e) { transactionManager.rollback(status); } }); } }

这几个坑都是我自己踩过的。尤其是第一个,子线程异常被吞掉,线上日志看着一切正常,数据就是对不上,排查了半天才发现。


多线程事务的本质是个分布式一致性问题。单机场景下本地消息表已经够用;真要跨服务跨库,再考虑 Seata。别上来就追求强一致性,大多数业务根本不需要。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/27 20:33:20

清华大学thuthesis论文模板:5步快速掌握专业论文排版

清华大学thuthesis论文模板&#xff1a;5步快速掌握专业论文排版 【免费下载链接】thuthesis LaTeX Thesis Template for Tsinghua University 项目地址: https://gitcode.com/gh_mirrors/th/thuthesis 清华大学thuthesis LaTeX论文模板是清华大学官方发布的学位论文排版…

作者头像 李华
网站建设 2026/5/27 20:31:44

天赐范式第55天:5月11日18:00-19:00更新病毒库导致我python命令拒绝访问,是否需要关闭Windows Defender触发我对应上尼采哲学“越是向往高处的阳光,根就越要伸向黑暗”

天赐范式&#xff1a;当微软更新了病毒库导致我的python命令不能用了,提示拒绝访问&#xff0c;是否要关闭Windows Defender才能继续使用python命令&#xff0c;这件事情的背后&#xff0c;是否对应上了尼采的哲学“越是向往高处的阳光&#xff0c;根就越要伸向黑暗”阿里云百炼…

作者头像 李华
网站建设 2026/5/27 20:31:10

告别杂乱地图!QGIS图层顺序与符号化管理的3个核心技巧

告别杂乱地图&#xff01;QGIS图层顺序与符号化管理的3个核心技巧当你第一次在QGIS中叠加多个图层时&#xff0c;是否曾被混乱的视觉效果困扰&#xff1f;道路压盖了建筑轮廓&#xff0c;标注与点位重叠&#xff0c;色彩冲突让地图失去专业感。这并非数据问题&#xff0c;而是图…

作者头像 李华
网站建设 2026/5/27 20:30:12

2026亲测10款降AIGC软件红黑榜!优缺点无保留曝光,达标率对标顶级水准

2026 年&#xff0c;AI 写稿、AI 生成内容已经成了学生党、打工人和内容创作者的日常&#xff0c;但随之而来的「AI 率过高」问题也成了新的麻烦&#xff1a;论文查重 AI 率超标、职场报告被判定 AI 生成、自媒体内容过不了平台原创审核… 为了帮大家解决这个痛点&#xff0c;我…

作者头像 李华
网站建设 2026/5/27 20:28:08

CUDA内核融合优化:实现50ms延迟的流式TTS推理

1. 项目概述&#xff1a;让单个CUDA内核“开口说话”最近我完成了一个挺有意思的尝试&#xff1a;让一个单独的CUDA内核&#xff0c;直接驱动一个完整的文本转语音模型进行流式推理&#xff0c;最终在RTX 5090上实现了端到端延迟稳定在50毫秒左右。这个项目听起来有点“疯狂”&…

作者头像 李华