spring-tx source

Table of Contents

spring-tx 解析

feat-2024/11/12 延迟加载connection的方法

前言

  • 事务在oltp的关键性不必多说,使用orm又不想手动设置,掌握spring-tx是关键的
  • 本文主要针对spring-tx的源码解析

延迟加载

@Transactional的长事务的优化,但是仅限于某种简单场景,如下

	@Transactional
	public void doXX(){
		// 耗时操作
		rpc();
		// ops 1
		lambdaUpdate
			.set()
			.update();
		// ops 2	
		lambdaUpdate
			.set()
			.update();	
	}

实际上就是,在getConnection的时候,不直接获取,延迟到method,invoke()的时候,不需要自行实现,只需要使用spring提供的LazyConnectionDataSourceProxy,简单看下实现思路

// 执行器
private class LazyConnectionInvocationHandler implements InvocationHandler{
	// ...
	@Override
	@Nullable
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		//截取方法的一部分
		// Target Connection already fetched,
			// or target Connection necessary for current operation ->
			// invoke method on target connection.
			try {
				// 这里在方法的执行时才会获取target
				return method.invoke(getTargetConnection(method), args);
			}
			catch (InvocationTargetException ex) {
				throw ex.getTargetException();
			}
	}
}

如何引入?其实就是在注入DataSource的时候采用LazyConnectionDataSourceProxy,手动配置下

@Bean
public DataSource dataSource(){
    HikariConfig hikariConfig = new HikariConfig();
    hikariConfig.setAutoCommit(false);
    hikariConfig.setDataSource(dataSourceProvider().dataSource());
    HikariDataSource poolingDataSource = new HikariDataSource(hikariConfig);

    ChainListener listener = new ChainListener();
    SLF4JQueryLoggingListener loggingListener = new SLF4JQueryLoggingListener();
    loggingListener.setQueryLogEntryCreator(new InlineQueryLogEntryCreator());
    listener.addListener(loggingListener);
    listener.addListener(new DataSourceQueryCountListener());
    DataSource loggingDataSource = ProxyDataSourceBuilder
        .create(poolingDataSource())
        .name(DATA_SOURCE_PROXY_NAME)
        .listener(listener)
        .build();

    return new LazyConnectionDataSourceProxy(loggingDataSource);
}

mysql

mysql的事务隔离级别存在 rr,rc,ru,serializable,除非在session中设置 transaction_isolation,不然隔离等级是不会变的 不同的隔离级别出现的问题不是重点,应用层考虑事务必须考虑到可重入性,对于mysql来说,保证事务的隔离性足够了,但是它不能控制client行为,如果一段代码内,先开启长事务,然后开启一些小事务,很有可能对某些数据并发很高,如果在客户端解决这个问题,对client和server都提升效率
解决并发的最好的方法就是没有并发

基础

spring-tx的isolationmysql保持一致

public enum Isolation {
	DEFAULT(TransactionDefinition.ISOLATION_DEFAULT),
	READ_UNCOMMITTED(TransactionDefinition.ISOLATION_READ_UNCOMMITTED),
	READ_COMMITTED(TransactionDefinition.ISOLATION_READ_COMMITTED),
	REPEATABLE_READ(TransactionDefinition.ISOLATION_REPEATABLE_READ),
	SERIALIZABLE(TransactionDefinition.ISOLATION_SERIALIZABLE);
	/...
}

spring抽象的 TransactionDefinition,就是对事务的抽象,包含5个属性 名称,隔离级别,超时时间,是否只读,传播机制

定义

public interface TransactionDefinition {
	//事务的传播行为
	int PROPAGATION_REQUIRED = 0;
	int PROPAGATION_SUPPORTS = 1;
	int PROPAGATION_MANDATORY = 2;
	int PROPAGATION_REQUIRES_NEW = 3;
	int PROPAGATION_NOT_SUPPORTED = 4;
	int PROPAGATION_NEVER = 5;
	int PROPAGATION_NESTED = 6;
	//隔离级别
	int ISOLATION_DEFAULT = -1;
	int ISOLATION_READ_UNCOMMITTED = 1;  
	int ISOLATION_READ_COMMITTED = 2;  
	int ISOLATION_REPEATABLE_READ = 4;  
	int ISOLATION_SERIALIZABLE = 8;  
	// 超时时间
	int TIMEOUT_DEFAULT = -1;

	/...
	//这里存在一些 default方法
}

传播机制是针对业务的,所以不同的方法的选择不同,隔离性是整个DB的

传播机制

7种隔离级别,最好自己看下源码,含义和名称基本相同,对于spring-tx,EQUIRED,REQUIRES_NEW,NESTED都是会将资源suspend,然后start事务, 如果当前存在事务,走的是传播策略,无设计模式,只有朴素的if-else

  1. PROPAGATION_NOT_SUPPORTED虽然不支持重入,但是内部的事务会以非事务的方式运行,可以理解为口头约束
  2. PROPAGATION_NEVER重入直接抛异常
  3. PROPAGATOIN_SUPPORT支持传播,但是外部无事务的时候,还是以非事务的方式运行,可以理解为继承,
  4. PROPAGATION_REQUIRED存在事务继承,不存在就创建,
  5. PROPAGATION_MANDATORY要求必须存在外部事务,不然抛异常
  6. PROPAGATION_NESTED如果存在事务,就在嵌套事务执行,否则行为上和PROPAGATION_REQUIRED相同
  7. PROPAGATION_REQUIRED_NEW,创建一个新事务,如果存在外部事务,suspend

使用

@Slf4j
@RequiredArgsConstructor
public class TXTest {
	/**
	 这里的transaction template 应该提前交给spring注入
		
	*/
    private final TransactionTemplate transactionTemplate;

    @Transactional(propagation = Propagation.SUPPORTS,rollbackFor = RuntimeException.class)
    public void do1(){

    }
    public void do2() {
        transactionTemplate.executeWithoutResult(status -> {
            log.info("do2");
        });
    }
    public void do3() {
    		//从这里不难看出 编程式事务的使用是十分方便的,不亚于@Transactional
    		//却比其拥有更好的性能,不会因为一些难易发现的使用错误导致事务失效
        transactionTemplate.execute(new TransactionCallback<Object>() {
            @Override
            public Object doInTransaction(TransactionStatus status) {
                log.info("do3");
                return null;
            }
        });
    }
}

自定义transaction template,其实这样存在一个问题,就是对于不同的传播策略,注入的话需要使用一个包装类

@Configuration
public class MyConfig  {
    
    @Bean
    public TransactionTemplate transactionTemplate(@NotNull PlatformTransactionManager transactionManager) {
        DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
        definition.setName("transactionTemplate");
        definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
		// 支持传播
        definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
        definition.setTimeout(30000);
        definition.setReadOnly(false);
        return new TransactionTemplate(transactionManager, definition);
    } 
}

流程

目前只存在TramsactionTemplate的流程

  1. 尝试获取Transaction
  2. AbstractTransactionManager执行getTransaction()
  3. DatasourceTransactionManager执行doBegin()获取ConnectionHolder
  4. 如果是新事务获取新连接,否则更新ThreadLocal信息
  5. 业务逻辑action.doInTransaction()执行
  6. 异常执行 AbstractTransactionManager.rollback(status)
  7. DatasourceTransactionManager执行doRollback(…)
  8. 正常执行 AbstractTransactionManager。commit(status)
  9. DatasourceTransactionManager执行doCommit(…)
  10. AbstractTransactionManager收尾

整个链路存在不少triggerXXX,doBeforeXXX,doAfterXXX或者TransactionExecuteListener的执行,对于事务传播也是不同策略,源码分析中十分明确,从中最大的收获便是对模板方法的学习

源码

这一阶段主要是对spring-tx出现的核心组件解析,version >= 6.1,如果只为了解可以关注DatasourceTransactionManager,这里没做基于@Transactional的事务织入,原因是了解了TransationTemlate后,如果对spring-aop了解的话,学习声明式事务是很容易的,这里给出核心的类 TransactionAspectSupport,TransactionInterceptor,无论是声明式事务还是编程式事务,核心的逻辑只有3个,GetTransaction commit,rollback,都是基于AbstracTransactionManager的流程的串行
其实源码的同步器我也不太理解作用是什么,主要是DataSourceManager里就是拿到了Connection,然后释放,加个todo,注意到是存在一些埋点逻辑的,所以以后可以基于这个做满sql告警
注意之后的阅读可能会花费不少时间
todo Synchronization机制

TransactionManager

事务管理的顶层接口,实现传统事务和响应式事务

public interface TransactionManager {
}
// 事务管理器的接口
public interface PlatformTransactionManager extends TransactionManager {

	TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
	void commit(TransactionStatus status) throws TransactionException;
	void rollback(TransactionStatus status) throws TransactionException;
}

TransactionSynchronizationManager

事务同步管理,理解为一个context,依据thread建立的,提供transaction信息对thread的绑定,一个事务中执行的资源,需要确定

// abstract保证不能被实例化,多个threadlocal对象存储线程绑定信息
public abstract class TransactionSynchronizationManager {

	private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");
	// 同步器,一个事务可能多次需要挂起
	private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
			new NamedThreadLocal<>("Transaction synchronizations");
	// 名称
	private static final ThreadLocal<String> currentTransactionName =
			new NamedThreadLocal<>("Current transaction name");
	//是否可读
	private static final ThreadLocal<Boolean> currentTransactionReadOnly =
			new NamedThreadLocal<>("Current transaction read-only status");
	//等级
	private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
			new NamedThreadLocal<>("Current transaction isolation level");
	//是否存活
	private static final ThreadLocal<Boolean> actualTransactionActive =
			new NamedThreadLocal<>("Actual transaction active");

	/..存在Getter和Setter,已省略
	/**
	对资源的回收,在get和bind中都存在检查
	*/
	// 获取资源的同时检查是否被设置为无效
	@Nullable
	private static Object doGetResource(Object actualKey) {
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// resourceholder被设置为无效
		if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}
	// 绑定资源
	public static void bindResource(Object key, Object value) throws IllegalStateException {
		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
		Assert.notNull(value, "Value must not be null");
		Map<Object, Object> map = resources.get();
		// set ThreadLocal Map if none found
		if (map == null) {
			map = new HashMap<>();
			resources.set(map);
		}
		Object oldValue = map.put(actualKey, value);
		// 如果说旧值已经被标记无效了,那么直接回收
		if (oldValue instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
			// 删除强引用,gc时候回收
			oldValue = null;
		}
		// 存在旧值且未被标记
		if (oldValue != null) {
			throw new IllegalStateException(
					"Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
		}
	}
	//解除资源绑定
	@Nullable
	private static Object doUnbindResource(Object actualKey) {
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.remove(actualKey);
		// Remove entire ThreadLocal if empty...
		if (map.isEmpty()) {
			resources.remove();
		}
		// Transparently suppress a ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) {
			value = null;
		}
		return value;
	}
	//返回同步是否存活
	public static boolean isSynchronizationActive() {
		return (synchronizations.get() != null);
	}
	//同步初始化
	public static void initSynchronization() throws IllegalStateException {
		if (isSynchronizationActive()) {
			throw new IllegalStateException("Cannot activate transaction synchronization - already active");
		}
		//linkedhashset保证了插入顺序,基于hash表
		synchronizations.set(new LinkedHashSet<>());
	}

	//注册同步器
	public static void registerSynchronization(TransactionSynchronization synchronization)
			throws IllegalStateException {

		Assert.notNull(synchronization, "TransactionSynchronization must not be null");
		Set<TransactionSynchronization> synchs = synchronizations.get();
		if (synchs == null) {
			throw new IllegalStateException("Transaction synchronization is not active");
		}
		synchs.add(synchronization);
	}
	//返回同步数据的快照
	public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
		Set<TransactionSynchronization> synchs = synchronizations.get();
		if (synchs == null) {
			throw new IllegalStateException("Transaction synchronization is not active");
		}
		if (synchs.isEmpty()) {
			return Collections.emptyList();
		}
		else if (synchs.size() == 1) {
			return Collections.singletonList(synchs.iterator().next());
		}
		else {
			// Sort lazily here, not in registerSynchronization.
			List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
			OrderComparator.sort(sortedSynchs);
			// unmodifiableList 实际上就是新建了一个final list
			return Collections.unmodifiableList(sortedSynchs);
		}
	}
	//删除所有同步器
	public static void clearSynchronization() throws IllegalStateException {
		// 存在同步,不可清除
		if (!isSynchronizationActive()) {
			throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
		}
		synchronizations.remove();
	}

	// 经典的threadlocal的回收问题
	public static void clear() {
		synchronizations.remove();
		currentTransactionName.remove();
		currentTransactionReadOnly.remove();
		currentTransactionIsolationLevel.remove();
		actualTransactionActive.remove();
	}

}

AbstractPlatformTransactionManager

  • PlatformTransactionManager的抽象实现
  • 出现了大量的trigger_xxx方法,都是对 TransactionSynchronization做处理,比如回收,最终都会集中在TransactionSynchronizationUtils这个工具类中,这里就不展开,spring6.1引入了 TransactionExecutionListener,主要是为了统计,我猜测的可能是统计事务的执行量,成功比例之类的,相当于事务各个阶段执行的切面
  • 对事务存在resume,suspend,.etc操作,实际上是对TransactionSynchronization的resume,suspend,同理此类实现,此处不展开
// 学习模板方法的最佳框架
public abstract class AbstractPlatformTransactionManager
		implements PlatformTransactionManager, ConfigurableTransactionManager, Serializable {

	private Collection<TransactionExecutionListener> transactionExecutionListeners = new ArrayList<>();

    /... 存在批量的getter和 setter
   /**
    *模板方法,也就是实现类需要给出的实现
    * */
	//@paul mark: 获取事务
	protected abstract Object doGetTransaction() throws TransactionException;
	// @paul mark: 开启事务
	protected abstract void doBegin(Object transaction, TransactionDefinition definition)
			throws TransactionException;
	// @paul mark: 挂起事务
	protected Object doSuspend(Object transaction) throws TransactionException {
		throw new TransactionSuspensionNotSupportedException(
				"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
	}
	// @paul mark: 恢复事务
	protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
		throw new TransactionSuspensionNotSupportedException(
				"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
	}
	// @paul mark: 准备提交
	protected void prepareForCommit(DefaultTransactionStatus status) {
	}
	// @paul mark: 提交事务
	protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;
	// @paul mark: 回滚事务
	protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;
    // @paul mark: 设置回滚
	protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
		throw new IllegalTransactionStateException(
				"Participating in existing transactions is not supported - when 'isExistingTransaction' " +
				"returns true, appropriate 'doSetRollbackOnly' behavior must be provided");
	}
    // @paul mark: 事务提交后处理,清理资源
	protected void doCleanupAfterCompletion(Object transaction) {
	}

    //检查当前事务是否和传入的这个事务对象是同一个
	protected boolean isExistingTransaction(Object transaction) throws TransactionException {
		return false;
	}
	protected boolean useSavepointForNestedTransaction() {
		return true;
	}
	
	//只有事务状态是 未同步状态才会执行
	protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
		if (status.isNewSynchronization()) {
			TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
					definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
							definition.getIsolationLevel() : null);
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
			TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
			TransactionSynchronizationManager.initSynchronization();
		}
	}
	//获取超时时间
	protected int determineTimeout(TransactionDefinition definition) {
		if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
			return definition.getTimeout();
		}
		return getDefaultTimeout();
	}
	//挂起当前事务,比如你要在一个事务内操作redis,mq .etc的操作或者事务重入
    //理论上进入一个事务也算非事务操作,所以已存在事务的情况下,也会suspend
	@Nullable
	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
	
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {
					// 做suspend的时候,如果transaction active则suspend失败
					suspendedResources = doSuspend(transaction);
				}
				//数据快照
                // 数据从线程的 threadlocal里面拿
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                //设置死亡
				TransactionSynchronizationManager.setActualTransactionActive(false);
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException | Error ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		}
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

	// 恢复事务
	protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
			throws TransactionException {

		if (resourcesHolder != null) {
			Object suspendedResources = resourcesHolder.suspendedResources;
			if (suspendedResources != null) {
				//具体实现
				doResume(transaction, suspendedResources);
			}
			List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
			if (suspendedSynchronizations != null) {
				// 恢复事务同步
				TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
				TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
				doResumeSynchronization(suspendedSynchronizations);
			}
		}
	}


  // 事务结束后的收尾
	private void cleanupAfterCompletion(DefaultTransactionStatus status) {
		status.setCompleted();
		if (status.isNewSynchronization()) {
            // 新事务清理同步资源
			TransactionSynchronizationManager.clear();
		}
		if (status.isNewTransaction()) {
			doCleanupAfterCompletion(status.getTransaction());
		}
		if (status.getSuspendedResources() != null) {
			if (status.isDebug()) {
				logger.debug("Resuming suspended transaction after completion of inner transaction");
			}
			Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
            // 唤起外部事务
			resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
		}
	}
	
	// resource 的持有对象,静态内部类
	protected static final class SuspendedResourcesHolder {

		@Nullable
		private final Object suspendedResources;

		@Nullable
		private List<TransactionSynchronization> suspendedSynchronizations;

		@Nullable
		private String name;

		private boolean readOnly;

		@Nullable
		private Integer isolationLevel;

		private boolean wasActive;
		/..存在Getter和Setter
	}

}

commit

事务提交入口

    //代理
	@Override
	public final void commit(TransactionStatus status) throws TransactionException {
		// 已经提交,幂等
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}	
		//如果事务明确标记为回滚
		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		if (defStatus.isLocalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Transactional code has requested rollback");
			}
			processRollback(defStatus, false);
			return;
		}
        // shouldCommitOnGlobalRollbackOnly 默认为false,
        // 不应该在全局回滚策略下commit && 被设置为全局回滚
		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
			}
			processRollback(defStatus, true);
			return;
		}
		//执行commit
		processCommit(defStatus);
	}

说明:TransactionStatus 实际上是spring-tx对事务的抽象

	//处理事务提交
	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			boolean commitListenerInvoked = false;

			try {
				boolean unexpectedRollback = false;
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
                //NESTED 事务传播
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
					commitListenerInvoked = true;
					status.releaseHeldSavepoint();
				}
                //新事务
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
					commitListenerInvoked = true;
					// 调用具体实现
					doCommit(status);
				}
				else if (isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = status.isGlobalRollbackOnly();
				}

				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (unexpectedRollback) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
				throw ex;
			}
			catch (TransactionException ex) {
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
					if (commitListenerInvoked) {
						this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, ex));
					}
				}
				throw ex;
			}
			catch (RuntimeException | Error ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
				if (commitListenerInvoked) {
					this.transactionExecutionListeners.forEach(listener -> listener.afterCommit(status, null));
				}
			}

		}
		finally {
            //清理
			cleanupAfterCompletion(status);
		}
	}

rollback

spring 6.1加入了TransactionExecutionListeners,目前只有test实现

    //代理
	@Override
	public final void rollback(TransactionStatus status) throws TransactionException {
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		processRollback(defStatus, false);
	}

	//执行回滚逻辑
	private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
		try {
			boolean unexpectedRollback = unexpected;
			boolean rollbackListenerInvoked = false;

			try {
				triggerBeforeCompletion(status);
                // NESTED事务传播行为
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Rolling back transaction to savepoint");
					}
					this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
					rollbackListenerInvoked = true;
					status.rollbackToHeldSavepoint();
				}
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction rollback");
					}
                    // 数据统计
					this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
					rollbackListenerInvoked = true;
                    //执行回滚
					doRollback(status);
				}
				else {
                    // 存在长事务
					if (status.hasTransaction()) {
                        // 被设置为回滚
						if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
							if (status.isDebug()) {
								logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
							}
                            //仅回滚当前事务
							doSetRollbackOnly(status);
						}
						else {
							if (status.isDebug()) {
								logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
							}
						}
					}
					else {
						logger.debug("Should roll back transaction but cannot - no transaction available");
					}
					// Unexpected rollback only matters here if we're asked to fail early
					if (!isFailEarlyOnGlobalRollbackOnly()) {
						unexpectedRollback = false;
					}
				}
			}
			catch (RuntimeException | Error ex) {
                // 同步资源回收
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				if (rollbackListenerInvoked) {
					this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, ex));
				}
				throw ex;
			}
            // 同步资源回收
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
			if (rollbackListenerInvoked) {
				this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
			}

			// Raise UnexpectedRollbackException if we had a global rollback-only marker
			if (unexpectedRollback) {
				throw new UnexpectedRollbackException(
						"Transaction rolled back because it has been marked as rollback-only");
			}
		}
		finally {
			cleanupAfterCompletion(status);
		}
	}
  

getTransaction

  • 事务执行入口,串联事务执行
  • 内部存在一个方法prepareTransactionStatus实际就是对ThreadLocal里的值更新
  • 此方法存在对事务传播行为的判断,可以理解为主要的逻辑控制器
	// 对TransactionOperations的实现
	@Override
	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

		// Use defaults if no transaction definition given.
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
		// 获取当前线程的事务
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();
		//判断是否存在事务
		if (isExistingTransaction(transaction)) {
			// 如果存在事务,就要根据事务传播机制handle
			return handleExistingTransaction(def, transaction, debugEnabled);
		}
		// 检查事务的超时时间,不能小于默认值
		if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
		}
		// mandatory策略不允许外部无事务
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
				// suspend 当前资源
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
			}
			try {
				return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + def);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
		}
	}

实际开启一个事务


    //开启一个事务
	private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
			boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
        //事务开启切面
		this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
		try {
            // 不同实现的具体逻辑
            // 注意这里是捕获开启事务的异常,不是事务执行过程的异常
			doBegin(transaction, definition);
		}
		catch (RuntimeException | Error ex) {
			this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
			throw ex;
		}
		prepareSynchronization(status, definition);
        //事务开启后切面
		this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
		return status;
	}
	// 处理重入,可以认为是事务处理的第2条线
	private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {
		// NEVER策略不支持事务重入
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}
        //开启新事务,suspend当前事务的resourceHolder,交给新事务,如果开启事务失败,回滚resume
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");
			}
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				return startTransaction(definition, transaction, false, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
		}
        //NESTED传播行为
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
            //todo savepoint机制
			if (useSavepointForNestedTransaction()) {
				// Create savepoint within existing Spring-managed transaction,
				// through the SavepointManager API implemented by TransactionStatus.
				// Usually uses JDBC savepoints. Never activates Spring synchronization.
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, false, false, true, debugEnabled, null);
				this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
				try {
					status.createAndHoldSavepoint();
				}
				catch (RuntimeException | Error ex) {
					this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
					throw ex;
				}
				this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				return startTransaction(definition, transaction, true, debugEnabled, null);
			}
		}
        // 以下事务传播行为不加事务
		// PROPAGATION_REQUIRED, PROPAGATION_SUPPORTS, PROPAGATION_MANDATORY:
		// regular participation in existing transaction.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}
		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									DefaultTransactionDefinition.getIsolationLevelName(currentIsolationLevel) :
									"(unknown)"));
				}
			}
			if (!definition.isReadOnly()) {
                // 当前事务只读,只读事务内只能包含只读事务
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}

TransactionTemplate

@Transactional使用不当会导致无事务运行,手动加事务就可以避免,但是直接使用 TransactionManager需要手动try catch和回滚操作,所以使用TransactionTemplate就可以简化事务的使用

/**
 * Template class that simplifies programmatic transaction demarcation and
 * transaction exception handling.
 */
@SuppressWarnings("serial")
public class TransactionTemplate extends DefaultTransactionDefinition
		implements TransactionOperations, InitializingBean {

	/** Logger available to subclasses. */
	protected final Log logger = LogFactory.getLog(getClass());
	@Nullable
	private PlatformTransactionManager transactionManager;
	
	/...
	@Override
	
    // 这个接口实际上是 TransactionOperations
	@Nullable
	public <T> T execute(TransactionCallback<T> action) throws TransactionException {
		
		Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

		if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager cpptm) {
			return cpptm.execute(this, action);
		}
		else {
            // 获取事务并开启
			TransactionStatus status = this.transactionManager.getTransaction(this);
			T result;
			try {
				//注意这里不是执行事务
				result = action.doInTransaction(status);
			}
			// 捕获sql执行异常,可以预测的是这里是等待最长
			catch (RuntimeException | Error ex) {
				// Transactional code threw application exception -> rollback
				rollbackOnException(status, ex);
				throw ex;
			}
			catch (Throwable ex) {
				// Transactional code threw unexpected exception -> rollback
				rollbackOnException(status, ex);
				throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
			}
			//提交
			this.transactionManager.commit(status);
			return result;
		}
	}

	/**
	 * Perform a rollback, handling rollback exceptions properly.
	 */
	private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
		// release的时候断言是off的,不必担心
		Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

		logger.debug("Initiating transaction rollback on application exception", ex);
		try {
			this.transactionManager.rollback(status);
		}
		//处理异常rollback时候,在rollbakc的时候出现异常,比如使用try catch配合事务,catch到exception的时候
		//relase resource 或者log,然后抛出让spring-tx回滚,这时候,如果rollback异常
		//业务异常就会被 rollback exception overridden,相当于异常并发
		catch (TransactionSystemException ex2) {
			logger.error("Application exception overridden by rollback exception", ex);
			ex2.initApplicationException(ex);
			throw ex2;
		}
		catch (RuntimeException | Error ex2) {
	
			logger.error("Application exception overridden by rollback exception", ex);
			throw ex2;
		}
	}


	@Override
	public boolean equals(@Nullable Object other) {
		return (this == other || (super.equals(other) && (!(other instanceof TransactionTemplate template) ||
				// 需要计较 transactionManager实现
				getTransactionManager() == template.getTransactionManager())));
	}
}

DatasourceTransactionManager

  • 可以理解为JDBC的事务管理器,是对AbstractTransactionManager的实现,其中必须实现的一些方法逻辑很简单,重点关注doBegindoCleanupAfterCompletion
  • 存在DataSourceTransactionObject,就是对Jdbc的封装,顶层接口是ResourceHolder

@SuppressWarnings("serial")
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
        //注意这个datasource不包含redis,是jdbc
	@Nullable
	private DataSource dataSource;

    /...  @Override方法
    // 开启事务
	@Override
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
            // 重入
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				Connection newCon = obtainDataSource().getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();

			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);
			txObject.setReadOnly(definition.isReadOnly());

            // 强制设置为 手动提交
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
				con.setAutoCommit(false);
			}

			prepareTransactionalConnection(con, definition);
			txObject.getConnectionHolder().setTransactionActive(true);

			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the connection holder to the thread.
			if (txObject.isNewConnectionHolder()) {
                //thread local 绑定
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, obtainDataSource());
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}

	@Override
	protected void doCleanupAfterCompletion(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 解锁资源,
		if (txObject.isNewConnectionHolder()) {
			TransactionSynchronizationManager.unbindResource(obtainDataSource());
		}

		// Reset connection.
		Connection con = txObject.getConnectionHolder().getConnection();
        //这里会判断是否强制改回 auto commit
		try {
			if (txObject.isMustRestoreAutoCommit()) {
				con.setAutoCommit(true);
			}
			DataSourceUtils.resetConnectionAfterTransaction(
					con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
		}
		catch (Throwable ex) {
			logger.debug("Could not reset JDBC Connection after transaction", ex);
		}

		if (txObject.isNewConnectionHolder()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
			}
            //释放连接回连接池
			DataSourceUtils.releaseConnection(con, this.dataSource);
		}
        //删除ThreadLocal内容
		txObject.getConnectionHolder().clear();
	}
    /...
}