1. 背景
一个主库和N个应用库的数据源,并且会同时操作主库和应用库的数据,需要解决以下两个问题:
- 如何动态管理多个数据源以及切换?
- 如何保证多数据源场景下的数据一致性(事务)?
本文主要探讨这两个问题的解决方案,希望能对读者有一定的启发。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 视频教程:https://doc.iocoder.cn/video/
2. 数据源切换原理
通过扩展Spring提供的抽象类AbstractRoutingDataSource
,可以实现切换数据源。其类结构如下图所示:
- targetDataSources&defaultTargetDataSource
项目上需要使用的所有数据源和默认数据源。
- resolvedDataSources&resolvedDefaultDataSource
当Spring容器创建AbstractRoutingDataSource
对象时,通过调用afterPropertiesSet
复制上述目标数据源。由此可见,一旦数据源实例对象创建完毕,业务无法再添加新的数据源。
- determineCurrentLookupKey
此方法为抽象方法,通过扩展这个方法来实现数据源的切换。目标数据源的结构为:Map
其key为lookup key
。
我们来看官方对这个方法的注释:
lookup key通常是绑定在线程上下文中,根据这个key去resolvedDataSources
中取出DataSource。
根据目标数据源的管理方式不同,可以使用基于配置文件和数据库表两种方式。基于配置文件管理方案无法后续添加新的数据源,而基于数据库表方案管理,则更加灵活。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
3. 配置文件解决方案
根据上面的分析,我们可以按照下面的步骤去实现:
-
定义
DynamicDataSource
类继承AbstractRoutingDataSource
,重写determineCurrentLookupKey()
方法。 -
配置多个数据源注入
targetDataSources
和defaultTargetDataSource
,通过afterPropertiesSet()
方法将数据源写入resolvedDataSources
和resolvedDefaultDataSource
。 -
调用
AbstractRoutingDataSource
的getConnection()
方法时,determineTargetDataSource()
方法返回DataSource
执行底层的getConnection()
。
其流程如下图所示:
3.1 创建数据源
DynamicDataSource
数据源的注入,目前业界主流实现步骤如下:
在配置文件中定义数据源
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.jdbc.Driver
#主数据源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
#其他数据源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***
在代码中配置Bean
@Configuration
publicclassDynamicDataSourceConfig{
@Bean
@ConfigurationProperties("spring.datasource.druid.master")
publicDataSourcefirstDataSource(){
returnDruidDataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.druid.second")
publicDataSourcesecondDataSource(){
returnDruidDataSourceBuilder.create().build();
}
@Bean
@Primary
publicDynamicDataSourcedataSource(DataSourcefirstDataSource,DataSourcesecondDataSource){
Map
3.2 AOP处理
通过DataSourceAspect
切面技术来简化业务上的使用,只需要在业务方法添加@SwitchDataSource
注解即可完成动态切换:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public@interfaceSwitchDataSource{
Stringvalue();
}
DataSourceAspect
拦截业务方法,更新当前线程上下文DataSourceContextHolder
中存储的key,即可实现数据源切换。
3.3 方案不足
基于AbstractRoutingDataSource
的多数据源动态切换,有个明显的缺点,无法动态添加和删除数据源。在我们的产品中,不能把应用数据源写死在配置文件。接下来分享一下基于数据库表的实现方案。
4. 数据库表解决方案
我们需要实现可视化的数据源管理,并实时查看数据源的运行状态。所以我们不能把数据源全部配置在文件中,应该将数据源定义保存到数据库表。参考AbstractRoutingDataSource
的设计思路,实现自定义数据源管理。
4.1 设计数据源表
主库的数据源信息仍然配置在项目配置文件中,应用库数据源配置参数,则设计对应的数据表。表结构如下所示:
这个表主要就是DataSource
的相关配置参数,其相应的ORM操作代码在此不再赘述,主要是实现数据源的增删改查操作。
4.2 自定义数据源管理
4.2.1 定义管理接口
通过继承AbstractDataSource
即可实现DynamicDataSource
。为了方便对数据源进行操作,我们定义一个接口DataSourceManager
,为业务提供操作数据源的统一接口。
publicinterfaceDataSourceManager{
voidput(Stringvar1,DataSourcevar2);
DataSourceget(Stringvar1);
BooleanhasDataSource(Stringvar1);
voidremove(Stringvar1);
voidcloseDataSource(Stringvar1);
Collectionall() ;
}
该接口主要是对数据表中定义的数据源,提供基础管理功能。
4.2.2 自定义数据源
DynamicDataSource
的实现如下图所示:
根据前面的分析,AbstractRoutingDataSource
是在容器启动的时候,执行afterPropertiesSet
注入数据源对象,完成之后无法对数据源进行修改。DynamicDataSource
则实现DataSourceManager
接口,可以将数据表中的数据源加载到dataSources。
4.2.3 切面处理
这一块的处理跟配置文件数据源方案处理方式相同,都是通过AOP技术切换lookup key。
publicDataSourcedetermineTargetDataSource(){
StringlookupKey=DataSourceContextHolder.getKey();
DataSourcedataSource=Optional.ofNullable(lookupKey)
.map(dataSources::get)
.orElse(defaultDataSource);
if(dataSource==null){
thrownewIllegalStateException("CannotdetermineDataSourceforlookupkey["+lookupKey+"]");
}
returndataSource;
}
4.2.4 管理数据源状态
在项目启动的时候,加载数据表中的所有数据源,并执行初始化。初始化操作主要是使用SpringBoot提供的DataSourceBuilder
类,根据数据源表的定义创建DataSource。在项目运行过程中,可以使用定时任务对数据源进行保活,为了提升性能再添加一层缓存。
AbstractRoutingDataSource
只支持单库事务,切换数据源是在开启事务之前执行。 Spring使用 DataSourceTransactionManager
进行事务管理。开启事务,会将数据源缓存到DataSourceTransactionObject
对象中,后续的commit和 rollback事务操作实际上是使用的同一个数据源。
如何解决切库事务问题?借助Spring的声明式事务处理,我们可以在多次切库操作时强制开启新的事务:
@SwitchDataSource
@Transactional(rollbackFor=Exception.class,propagation=Propagation.REQUIRES_NEW)
这样的话,执行切库操作的时候强制启动新事务,便可实现多次切库而且事务能够生效。但是这种事务方式,存在数据一致性问题:
假若ServiceB正常执行提交事务,接着返回ServiceA执行并且发生异常。因为两次处理是不同的事务,ServiceA这个事务执行回滚,而ServiceA事务已经提交。这样的话,数据就不一致了。接下来,我们主要讨论如何解决多库的事务问题。
6. 多库事务处理
6.1 关于事务的理解
首先有必要理解事务的本质。
1.提到Spring事务,就离不开事务的四大特性和隔离级别、七大传播特性。
事务特性和离级别是属于数据库范畴。Spring事务的七大传播特性是什么呢?它是Spring在当前线程内,处理多个事务操作时的事务应用策略,数据库事务本身并不存在传播特性。
2.Spring事务的定义包括:begin、commit、rollback、close、suspend、resume等动作。
-
begin(事务开始): 可以认为存在于数据库的命令中,比如Mysql的
start transaction
命令,但是在JDBC编程方式中不存在。 -
close(事务关闭): Spring事务的close()方法,是把
Connection
对象归还给数据库连接池,与事务无关。 -
suspend(事务挂起): Spring中事务挂起的语义是:需要新事务时,将现有的
Connection
保存起来(还有尚未提交的事务),然后创建新的Connection2
,Connection2
提交、回滚、关闭完毕后,再把Connection1
取出来继续执行。 - resume(事务恢复): 嵌套事务执行完毕,返回上层事务重新绑定连接对象到事务管理器的过程。
实际上,只有commit、rollback、close是在JDBC真实存在的,而其他动作都是应用的语意,而非JDBC事务的真实命令。因此,事务真实存在的方法是:setAutoCommit()
、commit()
、rollback()
。
close()语义为:
- 关闭一个数据库连接,这已经不再是事务的方法了。
使用DataSource并不会执行物理关闭,只是归还给连接池。
6.2 自定义管理事务
为了保证在多个数据源中事务的一致性,我们可以手动管理Connetion
的事务提交和回滚。考虑到不同ORM框架的事务管理实现差异,要求实现自定义事务管理不影响框架层的事务。
这可以通过使用装饰器设计模式,对Connection
进行包装重写commit和rolllback屏蔽其默认行为,这样就不会影响到原生Connection
和ORM框架的默认事务行为。其整体思路如下图所示:
这里并没有使用前面提到的@SwitchDataSource
,这是因为我们在TransactionAop
中已经执行了lookupKey的切换。
6.2.1 定义多事务注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public@interfaceMultiTransaction{
StringtransactionManager()default"multiTransactionManager";
//默认数据隔离级别,随数据库本身默认值
IsolationLevelisolationLevel()defaultIsolationLevel.DEFAULT;
//默认为主库数据源
StringdatasourceId()default"default";
//只读事务,若有更新操作会抛出异常
booleanreadOnly()defaultfalse;
业务方法只需使用该注解即可开启事务,datasourceId
指定事务用到的数据源,不指定默认为主库。
6.2.3 包装Connection
自定义事务我们使用包装过的Connection
,屏蔽其中的commit&rollback
方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。
publicclassConnectionProxyimplementsConnection{
privatefinalConnectionconnection;
publicConnectionProxy(Connectionconnection){
this.connection=connection;
}
@Override
publicvoidcommit()throwsSQLException{
//connection.commit();
}
publicvoidrealCommit()throwsSQLException{
connection.commit();
}
@Override
publicvoidclose()throwsSQLException{
//connection.close();
}
publicvoidrealClose()throwsSQLException{
if(!connection.getAutoCommit()){
connection.setAutoCommit(true);
}
connection.close();
}
@Override
publicvoidrollback()throwsSQLException{
if(!connection.isClosed())
connection.rollback();
}
...
}
这里commit&close
方法不执行操作,rollback执行的前提是连接执行close才生效。这样不管是使用哪个ORM框架,其自身事务管理都将失效。事务的控制就交由MultiTransaction
控制了。
6.2.4 事务上下文管理
publicclassTransactionHolder{
//是否开启了一个MultiTransaction
privatebooleanisOpen;
//是否只读事务
privatebooleanreadOnly;
//事务隔离级别
privateIsolationLevelisolationLevel;
//维护当前线程事务ID和连接关系
privateConcurrentHashMapconnectionMap;
//事务执行栈
privateStackexecuteStack;
//数据源切换栈
privateStackdatasourceKeyStack;
//主事务ID
privateStringmainTransactionId;
//执行次数
privateAtomicIntegertransCount;
//事务和数据源key关系
privateConcurrentHashMapexecuteIdDatasourceKeyMap;
}
每开启一个事物,生成一个事务ID并绑定一个ConnectionProxy
。事务嵌套调用,保存事务ID和lookupKey至栈中,当内层事务执行完毕执行pop。这样的话,外层事务只需在栈中执行peek即可获取事务ID和lookupKey。
6.2.5 数据源兼容处理
为了不影响原生事务的使用,需要重写getConnection
方法。当前线程没有启动自定义事务,则直接从数据源中返回连接。
@Override
publicConnectiongetConnection()throwsSQLException{
TransactionHoldertransactionHolder=MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
if(Objects.isNull(transactionHolder)){
returndetermineTargetDataSource().getConnection();
}
ConnectionProxyConnectionProxy=transactionHolder.getConnectionMap()
.get(transactionHolder.getExecuteStack().peek());
if(ConnectionProxy==null){
//没开跨库事务,直接返回
returndetermineTargetDataSource().getConnection();
}else{
transactionHolder.addCount();
//开了跨库事务,从当前线程中拿包装过的Connection
returnConnectionProxy;
}
}
6.2.6 切面处理
切面处理的核心逻辑是:维护一个嵌套事务栈,当业务方法执行结束,或者发生异常时,判断当前栈顶事务ID是否为主事务ID。如果是的话这时候已经到了最外层事务,这时才执行提交和回滚。详细流程如下图所示:
packagecom.github.mtxn.transaction.aop;
importcom.github.mtxn.application.Application;
importcom.github.mtxn.transaction.MultiTransactionManager;
importcom.github.mtxn.transaction.annotation.MultiTransaction;
importcom.github.mtxn.transaction.context.DataSourceContextHolder;
importcom.github.mtxn.transaction.support.IsolationLevel;
importcom.github.mtxn.transaction.support.TransactionHolder;
importcom.github.mtxn.utils.ExceptionUtils;
importlombok.extern.slf4j.Slf4j;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.aspectj.lang.reflect.MethodSignature;
importorg.springframework.core.annotation.Order;
importorg.springframework.stereotype.Component;
importjava.lang.reflect.Method;
@Aspect
@Component
@Slf4j
@Order(99999)
publicclassMultiTransactionAop{
@Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
publicvoidpointcut(){
if(log.isDebugEnabled()){
log.debug("startintransactionpointcut...");
}
}
@Around("pointcut()")
publicObjectaroundTransaction(ProceedingJoinPointpoint)throwsThrowable{
MethodSignaturesignature=(MethodSignature)point.getSignature();
//从切面中获取当前方法
Methodmethod=signature.getMethod();
MultiTransactionmultiTransaction=method.getAnnotation(MultiTransaction.class);
if(multiTransaction==null){
returnpoint.proceed();
}
IsolationLevelisolationLevel=multiTransaction.isolationLevel();
booleanreadOnly=multiTransaction.readOnly();
StringprevKey=DataSourceContextHolder.getKey();
MultiTransactionManagermultiTransactionManager=Application.resolve(multiTransaction.transactionManager());
//切数据源,如果失败使用默认库
if(multiTransactionManager.switchDataSource(point,signature,multiTransaction))returnpoint.proceed();
//开启事务栈
TransactionHoldertransactionHolder=multiTransactionManager.startTransaction(prevKey,isolationLevel,readOnly,multiTransactionManager);
Objectproceed;
try{
proceed=point.proceed();
multiTransactionManager.commit();
}catch(Throwableex){
log.error("executemethod:{}#{},err:",method.getDeclaringClass(),method.getName(),ex);
multiTransactionManager.rollback();
throwExceptionUtils.api(ex,"系统异常:%s",ex.getMessage());
}finally{
//当前事务结束出栈
StringtransId=multiTransactionManager.getTrans().getExecuteStack().pop();
transactionHolder.getDatasourceKeyStack().pop();
//恢复上一层事务
DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
//最后回到主事务,关闭此次事务
multiTransactionManager.close(transId);
}
returnproceed;
}
}
7.总结
本文主要介绍了多数据源管理的解决方案(应用层事务,而非XA二段提交保证),以及对多个库同时操作的事务管理。
需要注意的是,这种方式只适用于单体架构的应用。因为多个库的事务参与者都是运行在同一个JVM进行。如果是在微服务架构的应用中,则需要使用分布式事务管理(譬如:Seata)。
审核编辑 :李倩
-
spring
+关注
关注
0文章
340浏览量
14341 -
数据源
+关注
关注
1文章
63浏览量
9677 -
微服务
+关注
关注
0文章
137浏览量
7348 -
SpringBoot
+关注
关注
0文章
173浏览量
177
原文标题:SpringBoot 多数据源及事务解决方案
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论