spring-tx source
Table of Contents
spring-tx 解析
feat-2024/11/12
延迟加载connection的方法
前言
- 事务在
oltp
的关键性不必多说,使用orm又不想手动设置,掌握spring-tx
是关键的 - 本文主要针对spring-tx的源码解析
延迟加载
对@Transactional
的长事务的优化,但是仅限于某种简单场景,如下
@Transactional
public void doXX(){
// 耗时操作
rpc();
// ops 1
lambdaUpdate
.set()
.update();
// ops 2
lambdaUpdate
.set()
.update();
}
实际上就是,在getConnection
的时候,不直接获取,延迟到method,invoke()
的时候,不需要自行实现,只需要使用spring
提供的LazyConnectionDataSourceProxy
,简单看下实现思路
// 执行器
private class LazyConnectionInvocationHandler implements InvocationHandler{
// ...
@Override
@Nullable
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//截取方法的一部分
// Target Connection already fetched,
// or target Connection necessary for current operation ->
// invoke method on target connection.
try {
// 这里在方法的执行时才会获取target
return method.invoke(getTargetConnection(method), args);
}
catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
}
}
如何引入?其实就是在注入DataSource
的时候采用LazyConnectionDataSourceProxy
,手动配置下
@Bean
public DataSource dataSource(){
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setAutoCommit(false);
hikariConfig.setDataSource(dataSourceProvider().dataSource());
HikariDataSource poolingDataSource = new HikariDataSource(hikariConfig);
⠀
ChainListener listener = new ChainListener();
SLF4JQueryLoggingListener loggingListener = new SLF4JQueryLoggingListener();
loggingListener.setQueryLogEntryCreator(new InlineQueryLogEntryCreator());
listener.addListener(loggingListener);
listener.addListener(new DataSourceQueryCountListener());
DataSource loggingDataSource = ProxyDataSourceBuilder
.create(poolingDataSource())
.name(DATA_SOURCE_PROXY_NAME)
.listener(listener)
.build();
⠀
return new LazyConnectionDataSourceProxy(loggingDataSource);
}
mysql
mysql的事务隔离级别存在 rr,rc,ru,serializable
,除非在session中设置 transaction_isolation,不然隔离等级是不会变的
不同的隔离级别出现的问题不是重点,应用层考虑事务必须考虑到可重入性,对于mysql来说,保证事务的隔离性足够了,但是它不能控制client
行为,如果一段代码内,先开启长事务,然后开启一些小事务,很有可能对某些数据并发很高,如果在客户端解决这个问题,对client和server都提升效率
解决并发的最好的方法就是没有并发
基础
spring-tx的isolation
和mysql
保持一致
public enum Isolation {
DEFAULT(TransactionDefinition.ISOLATION_DEFAULT),
READ_UNCOMMITTED(TransactionDefinition.ISOLATION_READ_UNCOMMITTED),
READ_COMMITTED(TransactionDefinition.ISOLATION_READ_COMMITTED),
REPEATABLE_READ(TransactionDefinition.ISOLATION_REPEATABLE_READ),
SERIALIZABLE(TransactionDefinition.ISOLATION_SERIALIZABLE);
/...
}
spring抽象的 TransactionDefinition
,就是对事务的抽象,包含5个属性 名称,隔离级别,超时时间,是否只读,传播机制
定义
public interface TransactionDefinition {
//事务的传播行为
int PROPAGATION_REQUIRED = 0;
int PROPAGATION_SUPPORTS = 1;
int PROPAGATION_MANDATORY = 2;
int PROPAGATION_REQUIRES_NEW = 3;
int PROPAGATION_NOT_SUPPORTED = 4;
int PROPAGATION_NEVER = 5;
int PROPAGATION_NESTED = 6;
//隔离级别
int ISOLATION_DEFAULT = -1;
int ISOLATION_READ_UNCOMMITTED = 1;
int ISOLATION_READ_COMMITTED = 2;
int ISOLATION_REPEATABLE_READ = 4;
int ISOLATION_SERIALIZABLE = 8;
// 超时时间
int TIMEOUT_DEFAULT = -1;
/...
//这里存在一些 default方法
}
传播机制是针对业务的,所以不同的方法的选择不同,隔离性是整个DB的
传播机制
7种隔离级别,最好自己看下源码,含义和名称基本相同,对于spring-tx,EQUIRED,REQUIRES_NEW,NESTED
都是会将资源suspend,然后start事务,
如果当前存在事务,走的是传播策略,无设计模式,只有朴素的if-else
PROPAGATION_NOT_SUPPORTED
虽然不支持重入,但是内部的事务会以非事务的方式运行,可以理解为口头约束PROPAGATION_NEVER
重入直接抛异常PROPAGATOIN_SUPPORT
支持传播,但是外部无事务的时候,还是以非事务的方式运行,可以理解为继承,PROPAGATION_REQUIRED
存在事务继承,不存在就创建,PROPAGATION_MANDATORY
要求必须存在外部事务,不然抛异常PROPAGATION_NESTED
如果存在事务,就在嵌套事务执行,否则行为上和PROPAGATION_REQUIRED
相同PROPAGATION_REQUIRED_NEW
,创建一个新事务,如果存在外部事务,suspend
使用
@Slf4j
@RequiredArgsConstructor
public class TXTest {
/**
这里的transaction template 应该提前交给spring注入
*/
private final TransactionTemplate transactionTemplate;
@Transactional(propagation = Propagation.SUPPORTS,rollbackFor = RuntimeException.class)
public void do1(){
}
public void do2() {
transactionTemplate.executeWithoutResult(status -> {
log.info("do2");
});
}
public void do3() {
//从这里不难看出 编程式事务的使用是十分方便的,不亚于@Transactional
//却比其拥有更好的性能,不会因为一些难易发现的使用错误导致事务失效
transactionTemplate.execute(new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
log.info("do3");
return null;
}
});
}
}
自定义transaction template
,其实这样存在一个问题,就是对于不同的传播策略,注入的话需要使用一个包装类
@Configuration
public class MyConfig {
@Bean
public TransactionTemplate transactionTemplate(@NotNull PlatformTransactionManager transactionManager) {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setName("transactionTemplate");
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
// 支持传播
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
definition.setTimeout(30000);
definition.setReadOnly(false);
return new TransactionTemplate(transactionManager, definition);
}
}
流程
目前只存在TramsactionTemplate
的流程
- 尝试获取Transaction
- AbstractTransactionManager执行getTransaction()
- DatasourceTransactionManager执行doBegin()获取ConnectionHolder
- 如果是新事务获取新连接,否则更新ThreadLocal信息
- 业务逻辑action.doInTransaction()执行
- 异常执行 AbstractTransactionManager.rollback(status)
- DatasourceTransactionManager执行doRollback(…)
- 正常执行 AbstractTransactionManager。commit(status)
- DatasourceTransactionManager执行doCommit(…)
- AbstractTransactionManager收尾
整个链路存在不少triggerXXX
,doBeforeXXX
,doAfterXXX
或者TransactionExecuteListener
的执行,对于事务传播也是不同策略,源码分析中十分明确,从中最大的收获便是对模板方法
的学习
源码
这一阶段主要是对spring-tx出现的核心组件解析,version >= 6.1
,如果只为了解可以关注DatasourceTransactionManager
,这里没做基于@Transactional
的事务织入,原因是了解了TransationTemlate
后,如果对spring-aop
了解的话,学习声明式事务是很容易的,这里给出核心的类
TransactionAspectSupport
,TransactionInterceptor
,无论是声明式事务还是编程式事务,核心的逻辑只有3个,GetTransaction
commit
,rollback
,都是基于AbstracTransactionManager
的流程的串行
其实源码的同步器我也不太理解作用是什么,主要是DataSourceManager
里就是拿到了Connection,然后释放,加个todo,注意到是存在一些埋点逻辑的,所以以后可以基于这个做满sql告警
注意之后的阅读可能会花费不少时间
todo Synchronization机制
TransactionManager
事务管理的顶层接口,实现传统事务和响应式事务
public interface TransactionManager {
}
// 事务管理器的接口
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
TransactionSynchronizationManager
事务同步管理,理解为一个context,依据thread建立的,提供transaction信息对thread的绑定,一个事务中执行的资源,需要确定
// abstract保证不能被实例化,多个threadlocal对象存储线程绑定信息
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
// 同步器,一个事务可能多次需要挂起
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
// 名称
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
//是否可读
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
//等级
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
//是否存活
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
/..存在Getter和Setter,已省略
/**
对资源的回收,在get和bind中都存在检查
*/
// 获取资源的同时检查是否被设置为无效
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// resourceholder被设置为无效
if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
// 绑定资源
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
// set ThreadLocal Map if none found
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
// 如果说旧值已经被标记无效了,那么直接回收
if (oldValue instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
// 删除强引用,gc时候回收
oldValue = null;
}
// 存在旧值且未被标记
if (oldValue != null) {
throw new IllegalStateException(
"Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
}
}
//解除资源绑定
@Nullable
private static Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
// Transparently suppress a ResourceHolder that was marked as void...
if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
value = null;
}
return value;
}
//返回同步是否存活
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}
//同步初始化
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
//linkedhashset保证了插入顺序,基于hash表
synchronizations.set(new LinkedHashSet<>());
}
//注册同步器
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
//返回同步数据的快照
public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
if (synchs.isEmpty()) {
return Collections.emptyList();
}
else if (synchs.size() == 1) {
return Collections.singletonList(synchs.iterator().next());
}
else {
// Sort lazily here, not in registerSynchronization.
List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
OrderComparator.sort(sortedSynchs);
// unmodifiableList 实际上就是新建了一个final list
return Collections.unmodifiableList(sortedSynchs);
}
}
//删除所有同步器
public static void clearSynchronization() throws IllegalStateException {
// 存在同步,不可清除
if (!isSynchronizationActive()) {
throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
}
synchronizations.remove();
}
// 经典的threadlocal的回收问题
public static void clear() {
synchronizations.remove();
currentTransactionName.remove();
currentTransactionReadOnly.remove();
currentTransactionIsolationLevel.remove();
actualTransactionActive.remove();
}
}
AbstractPlatformTransactionManager
- 对
PlatformTransactionManager
的抽象实现 - 出现了大量的
trigger_xxx
方法,都是对TransactionSynchronization
做处理,比如回收,最终都会集中在TransactionSynchronizationUtils
这个工具类中,这里就不展开,spring6.1引入了TransactionExecutionListener
,主要是为了统计,我猜测的可能是统计事务的执行量,成功比例之类的,相当于事务各个阶段执行的切面 - 对事务存在resume,suspend,.etc操作,实际上是对
TransactionSynchronization
的resume,suspend,同理此类实现,此处不展开
// 学习模板方法的最佳框架
public abstract class AbstractPlatformTransactionManager
implements PlatformTransactionManager, ConfigurableTransactionManager, Serializable {
private Collection<TransactionExecutionListener> transactionExecutionListeners = new ArrayList<>();
/... 存在批量的getter和 setter
/**
*模板方法,也就是实现类需要给出的实现
* */
//@paul mark: 获取事务
protected abstract Object doGetTransaction() throws TransactionException;
// @paul mark: 开启事务
protected abstract void doBegin(Object transaction, TransactionDefinition definition)
throws TransactionException;
// @paul mark: 挂起事务
protected Object doSuspend(Object transaction) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
// @paul mark: 恢复事务
protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
// @paul mark: 准备提交
protected void prepareForCommit(DefaultTransactionStatus status) {
}
// @paul mark: 提交事务
protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;
// @paul mark: 回滚事务
protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;
// @paul mark: 设置回滚
protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
throw new IllegalTransactionStateException(
"Participating in existing transactions is not supported - when 'isExistingTransaction' " +
"returns true, appropriate 'doSetRollbackOnly' behavior must be provided");
}
// @paul mark: 事务提交后处理,清理资源
protected void doCleanupAfterCompletion(Object transaction) {
}
//检查当前事务是否和传入的这个事务对象是同一个
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
return false;
}
protected boolean useSavepointForNestedTransaction() {
return true;
}
//只有事务状态是 未同步状态才会执行
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
//获取超时时间
protected int determineTimeout(TransactionDefinition definition) {
if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
return definition.getTimeout();
}
return getDefaultTimeout();
}
//挂起当前事务,比如你要在一个事务内操作redis,mq .etc的操作或者事务重入
//理论上进入一个事务也算非事务操作,所以已存在事务的情况下,也会suspend
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 做suspend的时候,如果transaction active则suspend失败
suspendedResources = doSuspend(transaction);
}
//数据快照
// 数据从线程的 threadlocal里面拿
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
//设置死亡
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
// 恢复事务
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
//具体实现
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
// 恢复事务同步
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
// 事务结束后的收尾
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
// 新事务清理同步资源
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 唤起外部事务
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
// resource 的持有对象,静态内部类
protected static final class SuspendedResourcesHolder {
@Nullable
private final Object suspendedResources;
@Nullable
private List<TransactionSynchronization> suspendedSynchronizations;
@Nullable
private String name;
private boolean readOnly;
@Nullable
private Integer isolationLevel;
private boolean wasActive;
/..存在Getter和Setter
}
}
commit
事务提交入口
//代理
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 已经提交,幂等
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
//如果事务明确标记为回滚
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
// shouldCommitOnGlobalRollbackOnly 默认为false,
// 不应该在全局回滚策略下commit && 被设置为全局回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
//执行commit
processCommit(defStatus);
}
说明:TransactionStatus
实际上是spring-tx
对事务的抽象
//处理事务提交
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
boolean commitListenerInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
//NESTED 事务传播
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
commitListenerInvoked = true;
status.releaseHeldSavepoint();
}
//新事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
commitListenerInvoked = true;
// 调用具体实现
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
throw ex;
}
catch (TransactionException ex) {
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
if (commitListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, ex));
}
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
if (commitListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, null));
}
}
}
finally {
//清理
cleanupAfterCompletion(status);
}
}
rollback
spring 6.1加入了TransactionExecutionListeners
,目前只有test实现
//代理
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
//执行回滚逻辑
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
boolean rollbackListenerInvoked = false;
try {
triggerBeforeCompletion(status);
// NESTED事务传播行为
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
rollbackListenerInvoked = true;
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 数据统计
this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
rollbackListenerInvoked = true;
//执行回滚
doRollback(status);
}
else {
// 存在长事务
if (status.hasTransaction()) {
// 被设置为回滚
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
//仅回滚当前事务
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
// 同步资源回收
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
if (rollbackListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, ex));
}
throw ex;
}
// 同步资源回收
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
if (rollbackListenerInvoked) {
this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
}
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
getTransaction
- 事务执行入口,串联事务执行
- 内部存在一个方法
prepareTransactionStatus
实际就是对ThreadLocal
里的值更新 - 此方法存在对事务传播行为的判断,可以理解为主要的逻辑控制器
// 对TransactionOperations的实现
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取当前线程的事务
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
//判断是否存在事务
if (isExistingTransaction(transaction)) {
// 如果存在事务,就要根据事务传播机制handle
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 检查事务的超时时间,不能小于默认值
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// mandatory策略不允许外部无事务
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// suspend 当前资源
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
实际开启一个事务
//开启一个事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
//事务开启切面
this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
try {
// 不同实现的具体逻辑
// 注意这里是捕获开启事务的异常,不是事务执行过程的异常
doBegin(transaction, definition);
}
catch (RuntimeException | Error ex) {
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
throw ex;
}
prepareSynchronization(status, definition);
//事务开启后切面
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
return status;
}
// 处理重入,可以认为是事务处理的第2条线
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// NEVER策略不支持事务重入
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//开启新事务,suspend当前事务的resourceHolder,交给新事务,如果开启事务失败,回滚resume
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
//NESTED传播行为
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//todo savepoint机制
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, false, false, true, debugEnabled, null);
this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
try {
status.createAndHoldSavepoint();
}
catch (RuntimeException | Error ex) {
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
throw ex;
}
this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, true, debugEnabled, null);
}
}
// 以下事务传播行为不加事务
// PROPAGATION_REQUIRED, PROPAGATION_SUPPORTS, PROPAGATION_MANDATORY:
// regular participation in existing transaction.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
DefaultTransactionDefinition.getIsolationLevelName(currentIsolationLevel) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
// 当前事务只读,只读事务内只能包含只读事务
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
TransactionTemplate
@Transactional
使用不当会导致无事务运行,手动加事务就可以避免,但是直接使用 TransactionManager
需要手动try catch和回滚操作,所以使用TransactionTemplate
就可以简化事务的使用
/**
* Template class that simplifies programmatic transaction demarcation and
* transaction exception handling.
*/
@SuppressWarnings("serial")
public class TransactionTemplate extends DefaultTransactionDefinition
implements TransactionOperations, InitializingBean {
/** Logger available to subclasses. */
protected final Log logger = LogFactory.getLog(getClass());
@Nullable
private PlatformTransactionManager transactionManager;
/...
@Override
// 这个接口实际上是 TransactionOperations
@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager cpptm) {
return cpptm.execute(this, action);
}
else {
// 获取事务并开启
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
//注意这里不是执行事务
result = action.doInTransaction(status);
}
// 捕获sql执行异常,可以预测的是这里是等待最长
catch (RuntimeException | Error ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
//提交
this.transactionManager.commit(status);
return result;
}
}
/**
* Perform a rollback, handling rollback exceptions properly.
*/
private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
// release的时候断言是off的,不必担心
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
logger.debug("Initiating transaction rollback on application exception", ex);
try {
this.transactionManager.rollback(status);
}
//处理异常rollback时候,在rollbakc的时候出现异常,比如使用try catch配合事务,catch到exception的时候
//relase resource 或者log,然后抛出让spring-tx回滚,这时候,如果rollback异常
//业务异常就会被 rollback exception overridden,相当于异常并发
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
@Override
public boolean equals(@Nullable Object other) {
return (this == other || (super.equals(other) && (!(other instanceof TransactionTemplate template) ||
// 需要计较 transactionManager实现
getTransactionManager() == template.getTransactionManager())));
}
}
DatasourceTransactionManager
- 可以理解为
JDBC
的事务管理器,是对AbstractTransactionManager
的实现,其中必须实现的一些方法逻辑很简单,重点关注doBegin
和doCleanupAfterCompletion
- 存在
DataSourceTransactionObject
,就是对Jdbc的封装,顶层接口是ResourceHolder
@SuppressWarnings("serial")
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
//注意这个datasource不包含redis,是jdbc
@Nullable
private DataSource dataSource;
/... @Override方法
// 开启事务
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 重入
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// 强制设置为 手动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
//thread local 绑定
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 解锁资源,
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
//这里会判断是否强制改回 auto commit
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(
con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
//释放连接回连接池
DataSourceUtils.releaseConnection(con, this.dataSource);
}
//删除ThreadLocal内容
txObject.getConnectionHolder().clear();
}
/...
}