博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot mysql 多数据源配置,可实现读写分离
阅读量:6292 次
发布时间:2019-06-22

本文共 12927 字,大约阅读时间需要 43 分钟。

hot3.png

1、代码实现

import com.zaxxer.hikari.HikariDataSource;import lombok.Data;import lombok.Getter;import lombok.Setter;import org.aopalliance.intercept.MethodInvocation;import org.springframework.aop.Advisor;import org.springframework.aop.support.AbstractGenericPointcutAdvisor;import org.springframework.aop.support.AopUtils;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.ApplicationContextAware;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.core.Ordered;import org.springframework.core.annotation.Order;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;import org.springframework.transaction.interceptor.TransactionAttribute;import org.springframework.transaction.interceptor.TransactionInterceptor;import javax.servlet.ServletRequestEvent;import javax.servlet.ServletRequestListener;import javax.sql.DataSource;import java.lang.reflect.Field;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import java.util.concurrent.atomic.AtomicInteger;@Configuration("dynamicDataSourceConfig")@ConditionalOnProperty(name = "dynamic.jdbc", havingValue = "true")@EnableConfigurationProperties(value = DynamicDataSourceConfig.DynamicDataSourceProperties.class)public class DynamicDataSourceConfig {    /**     * 动态数据源     * @param dynamicDataSourceProperties     * @return     */    @Primary    @Bean("dynamicDataSource")    public DataSource dataSource(DynamicDataSourceProperties dynamicDataSourceProperties) {        JdbcDataSourceProperties[] jdbcDataSourceProperties = dynamicDataSourceProperties.getDatasources();        if (jdbcDataSourceProperties == null) {            throw new IllegalArgumentException("can not find any available dynamic.datasource config");        }        Map
targetDataSources = new HashMap<>(); for (int i = 0; i < jdbcDataSourceProperties.length; ++i) { JdbcDataSourceProperties properties = jdbcDataSourceProperties[i]; if (properties.getDatasourceId() == null || properties.getDatasourceId().trim().isEmpty()) { throw new IllegalArgumentException("dynamic.jdbc.datasources[" + i + "].datasourceId can not be null"); } if (targetDataSources.get(properties.getDatasourceId()) != null) { throw new IllegalArgumentException("dynamic.jdbc.datasources[" + i + "].datasourceId already exists"); } properties.setType(HikariDataSource.class); targetDataSources.put(properties.getDatasourceId(), properties.initializeDataSourceBuilder().build()); } DynamicDataSource dynamicDataSource = new DynamicDataSource(); dynamicDataSource.setTargetDataSources(targetDataSources); String[] readers = Arrays.asList(jdbcDataSourceProperties) .stream() .filter(e -> JdbcDataSourceProperties.READ_ACCESS_TYPE.equals(e.accessType)) .map(JdbcDataSourceProperties::getDatasourceId) .toArray(String[]::new); String[] writers = Arrays.asList(jdbcDataSourceProperties) .stream() .filter(e -> JdbcDataSourceProperties.WRITE_ACCESS_TYPE.equals(e.accessType)) .map(JdbcDataSourceProperties::getDatasourceId) .toArray(String[]::new); dynamicDataSource.setReaders(readers.length > 0 ? readers : null); dynamicDataSource.setWriters(writers.length > 0 ? writers : null); return dynamicDataSource; } @Bean("dynamicDataSourceServletRequestListener") @Order(Ordered.HIGHEST_PRECEDENCE) public ServletRequestListener servletRequestListener() { return new ServletRequestListener() { @Override public void requestDestroyed(ServletRequestEvent sre) { DynamicDataSource.clearTransactionDatasource(); } @Override public void requestInitialized(ServletRequestEvent sre) { DynamicDataSource.clearTransactionDatasource(); } }; } /** * 对aop中的Advisor注入 TransactionInterceptor 的代理类 * @return */ @Bean("dynamicDataSourceApplicationContextAware") public ApplicationContextAware applicationContextAware() { return (applicationContext -> { applicationContext.getBeansOfType(Advisor.class).values().stream().forEach(advisor -> { if (advisor.getAdvice() instanceof TransactionInterceptor) { TransactionInterceptorProxy proxy = new TransactionInterceptorProxy((TransactionInterceptor) advisor.getAdvice()); //尝试设置代理类 if (advisor instanceof AbstractGenericPointcutAdvisor) { ((AbstractGenericPointcutAdvisor) advisor).setAdvice(proxy); } else { //尝试通过反射代入 try { Class
clz = advisor.getClass(); TransactionInterceptor interceptor = null; while (interceptor == null && clz != null && clz != Object.class) { for (Field f : clz.getDeclaredFields()) { if (!f.isAccessible()) f.setAccessible(true); Object advice = f.get(advisor); if (advice == advisor.getAdvice()) { interceptor = (TransactionInterceptor) advice; f.set(advisor, proxy); break; } } clz = clz.getSuperclass(); } } catch (Throwable t) { // skip } } } }); }); } /** * 多数据源配置 */ @ConfigurationProperties("dynamic.jdbc") @Data public static class DynamicDataSourceProperties { private JdbcDataSourceProperties[] datasources; } /** * 继承springboot的数据源配置 */ @Data public static class JdbcDataSourceProperties extends DataSourceProperties { public static final String READ_ACCESS_TYPE = "read"; public static final String WRITE_ACCESS_TYPE = "write"; //确保唯一 private String datasourceId; //read 为读库,write 为写库,不配置为其他用途,比如生产环境配置相应的压力测试库等, // 该类型的数据源不参与正常的业务处理,除非程序主动选择 private String accessType; } /** * 数据源动态选择 */ public static class DynamicDataSource extends AbstractRoutingDataSource { private static final ThreadLocal
CURRENT_DATASOURCE = new ThreadLocal<>(); private static final ThreadLocal
FORCE_DATASOURCE = new ThreadLocal<>(); //写库,如果操作进来的一个是写,后面是读,那么都强制走读库,避免主从延迟,获取不到最新的数据 private static final ThreadLocal
CURRENT_WRITE_DATASOURCE = new ThreadLocal<>(); private static final ThreadLocal
CURRENT_TRANSACTION = ThreadLocal.withInitial(() -> new AtomicInteger(0)); @Getter @Setter private static String[] readers; @Getter @Setter private static String[] writers; private static final AtomicInteger readerCounter = new AtomicInteger(0); private static final AtomicInteger writerCounter = new AtomicInteger(0); static String getReader() { return readers != null ? readers[readerCounter.getAndIncrement() % readers.length] : null; } static String getWriter() { return writers != null ? writers[writerCounter.getAndIncrement() % writers.length] : null; } /** * 设置强制的数据源id * @param datasourceId */ public static void setCurrentDatasourceId(String datasourceId) { if (CURRENT_DATASOURCE.get() == null) { FORCE_DATASOURCE.set(datasourceId); } } public static String getCurrentDatasourceId() { return FORCE_DATASOURCE.get(); } public static void setCurrentTransactionDatasource(TransactionAttribute transactionDatasource) { CURRENT_TRANSACTION.get().incrementAndGet(); //添加事务记录 if (CURRENT_DATASOURCE.get() != null) return;//如果已经选中了,不重置,嵌套事务才会这样,service相互调用 //是否强制使用 if (FORCE_DATASOURCE.get() != null) { CURRENT_DATASOURCE.set(FORCE_DATASOURCE.get()); return; } if (CURRENT_WRITE_DATASOURCE.get() != null) { CURRENT_DATASOURCE.set(CURRENT_WRITE_DATASOURCE.get()); return; } //是否只读 boolean readOnly = transactionDatasource != null && transactionDatasource.isReadOnly(); String datasource = readOnly ? getReader() : getWriter(); //没有配置读库,走写库 if (datasource == null && readOnly) { datasource = getWriter(); } if (datasource == null) { throw new IllegalArgumentException("can not find any datasource "); } if (!readOnly) { CURRENT_WRITE_DATASOURCE.set(datasource); } CURRENT_DATASOURCE.set(datasource); } public static void reSetCurrentTransactionDatasource() { AtomicInteger tras = CURRENT_TRANSACTION.get(); if (tras.get() == 0) return;// 不应该存在这种情况 int cnt = tras.decrementAndGet(); if (cnt > 0) return; //还有嵌套事务没有完成 CURRENT_DATASOURCE.set(null); } private static String getCurrentLookupKey() { String key = CURRENT_DATASOURCE.get(); if (key == null) key = CURRENT_WRITE_DATASOURCE.get(); if (key == null) key = getReader(); if (key == null) key = getWriter(); return key; } static void clearTransactionDatasource() { CURRENT_DATASOURCE.set(null); FORCE_DATASOURCE.set(null); CURRENT_WRITE_DATASOURCE.set(null); CURRENT_TRANSACTION.get().set(0); } @Override protected Object determineCurrentLookupKey() { return getCurrentLookupKey(); } } public static class TransactionInterceptorProxy extends TransactionInterceptor { private TransactionInterceptor source; public TransactionInterceptorProxy(TransactionInterceptor transactionInterceptor) { this.source = transactionInterceptor; } @Override public Object invoke(MethodInvocation invocation) throws Throwable { Class
targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); TransactionAttribute transactionAttribute = this.source.getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass); try { if (transactionAttribute != null) DynamicDataSource.setCurrentTransactionDatasource(transactionAttribute); return this.source.invoke(invocation); } finally { if (transactionAttribute != null) DynamicDataSource.reSetCurrentTransactionDatasource(); } } }}

 

 

2、配置

dynamic.jdbc=truedynamic.jdbc.datasources[0].datasourceId=firstdynamic.jdbc.datasources[0].url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=falsedynamic.jdbc.datasources[0].username=readdynamic.jdbc.datasources[0].password=123456dynamic.jdbc.datasources[0].driver-class-name=com.mysql.cj.jdbc.Driver#read 为读库,write 为写库,不配置为其他用途,比如生产环境配置相应的压力测试库等dynamic.jdbc.datasources[0].accessType=readdynamic.jdbc.datasources[1].datasourceId=seconddynamic.jdbc.datasources[1].url=jdbc:mysql://127.0.0.1:3306/test2?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=falsedynamic.jdbc.datasources[1].username=readdynamic.jdbc.datasources[1].password=123456dynamic.jdbc.datasources[1].driver-class-name=com.mysql.cj.jdbc.Driverdynamic.jdbc.datasources[1].accessType=readdynamic.jdbc.datasources[2].datasourceId=thirddynamic.jdbc.datasources[2].url=jdbc:mysql://127.0.0.1:3306/test2?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=falsedynamic.jdbc.datasources[2].username=writedynamic.jdbc.datasources[2].password=123456dynamic.jdbc.datasources[2].driver-class-name=com.mysql.cj.jdbc.Driverdynamic.jdbc.datasources[2].accessType=write

 

转载于:https://my.oschina.net/linchuhao23/blog/3035824

你可能感兴趣的文章
Spring: IOC容器的实现
查看>>
Serverless五大优势,成本和规模不是最重要的,这点才是
查看>>
Nginx 极简入门教程!
查看>>
iOS BLE 开发小记[4] 如何实现 CoreBluetooth 后台运行模式
查看>>
Item 23 不要在代码中使用新的原生态类型(raw type)
查看>>
为网页添加留言功能
查看>>
JavaScript—数组(17)
查看>>
Android 密钥保护和 C/S 网络传输安全理论指南
查看>>
以太坊ERC20代币合约优化版
查看>>
Why I Began
查看>>
同一台电脑上Windows 7和Ubuntu 14.04的CPU温度和GPU温度对比
查看>>
js数组的操作
查看>>
springmvc Could not write content: No serializer
查看>>
Python系语言发展综述
查看>>
新手 开博
查看>>
借助开源工具高效完成Java应用的运行分析
查看>>
163 yum
查看>>
第三章:Shiro的配置——深入浅出学Shiro细粒度权限开发框架
查看>>
80后创业的经验谈(转,朴实但实用!推荐)
查看>>
让Windows图片查看器和windows资源管理器显示WebP格式
查看>>