Spring+MyBatis实现数据库读写分离方案

网友投稿 224 2023-06-17


Spring+MyBatis实现数据库读写分离方案

推荐第四种

方案1

通过MyBatis配置文件创建读写分离两个DataSource,每个SqlSessionFactoryBean对象的mapperLocations属性制定两个读写数据源的配置文件。将所有读的操作配置在读文件中,所有写的操作配置在写文件中。

优点:实现简单

缺点:维护麻烦,需要对原有的xml文件进行重新修改,不支持多读,不易扩展

实现方式

destroy-method="close">

方案2

通过Spring AOP在业务层实现读写分离,在DAO层调用前定义切面,利用Spring的AbstractRoutingDataSource解决多数据源的问题,实现动态选择数据源

优点:通过注解的方法在DAO每个方法上配置数据源,原有代码改动量少,易扩展,支持多读

缺点:需要在DAO每个方法上配置注解,人工管理,容易出错

实现方式

//定义枚举类型,读写

public enum DynamicDataSourceGlobal {

READ, WRITE;

}

import java.lang.annotation.ElementType;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

/**

* RUNTIME

* 定义注解

* 编译器将把注释记录在类文件中,在运行时 VM 将保留注释,因此可以反射性地读取。

* @author shma1664

*

*/

@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.METHOD)

public @interface DataSource {

public DynamicDataSourceGlobal value() default DynamicDataSourceGlobal.READ;

}

/**

* Created by IDEA

* 本地线程设置和获取数据源信息

* User: mashaohua

* Date: 2016-07-07 13:35

* Desc:

*/

public class DynamicDataSourceHolder {

private static final ThreadLocal holder = new ThreadLocal();

public static void putDataSource(DynamicDataSourceGlobal dataSource){

holder.set(dataSource);

}

public static DynamicDataSourceGlobal getDataSource(){

return holder.get();

}

public static void clearDataSource() {

holder.remove();

}

}

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-07-14 10:56

* Desc: 动态数据源实现读写分离

*/

public class DynamicDataSource extends AbstractRoutingDataSource {

private Object writeDataSource; //写数据源

private List readDataSources; //多个读数据源

private int readDataSourceSize; //读数据源个数

private int readDataSourcePollPattern = 0; //获取读数据源方式,0:随机,1:轮询

private AtomicLong counter = new AtomicLong(0);

private static final Long MAX_POOL = Long.MAX_VALUE;

private final Lock lock = new ReentrantLock();

@Override

public void afterPropertiesSet() {

if (this.writeDataSource == null) {

throw new IllegalArgumentException("Property 'writeDataSource' is required");

}

setDefaultTargetDataSource(writeDataSource);

Map targetDataSources = new HashMap<>();

targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);

if (this.readDataSources == null) {

readDataSourceSize = 0;

} else {

for(int i=0; i

targetDataSources.put(DynamicDataSourceGlobal.READ.name() + i, readDataSources.get(i));

}

readDataSourceSize = readDataSources.size();

}

setTargetDataSources(targetDataSources);

super.afterPropertiesSet();

}

@Override

protected Object determineCurrentLookupKey() {

DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();

if(dynamicDataSourceGlobal == null

|| dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE

|| readDataSourceSize <= 0) {

return DynamicDataSourceGlobal.WRITE.name();

}

int index = 1;

if(readDataSourcePollPattern == 1) {

//轮询方式

long currValue = counter.incrementAndGet();

if((currValue + 1) >= MAX_POOL) {

try {

lock.lock();

if((currValue + 1) >= MAX_POOL) {

counter.set(0);

}

} finally {

lock.unlock();

}

}

index = (int) (currValue % readDataSourceSize);

} else {

//随机方式

index = ThreadLocalRandom.current().nextInt(0, readDataSourceSize);

}

return dynamicDataSourceGlobal.name() + index;

}

public void setWriteDataSource(Object writeDataSource) {

this.writeDataSource = writeDataSource;

}

public void setReadDataSources(List readDataSources) {

this.readDataSources = readDataSources;

}

public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {

this.readDataSourcePollPattern = readDataSourcePollPattern;

}

}

import org.apache.log4j.Logger;

import org.aspectj.lang.JoinPoint;

import org.aspectj.lang.reflect.MethodSignature;

import java.lang.reflect.Method;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-07-07 13:39

* Desc: 定义选择数据源切面

*/

public class DynamicDataSourceAspect {

private static final Logger logger = Logger.getLogger(DynamicDataSourceAspect.class);

public void pointCut(){};

public void before(JoinPoint point)

{

Object target = point.getTarget();

String methodName = point.getSignature().getName();

Class[] clazz = target.getClass().getInterfaces();

Class[] parameterTypes = ((MethodSignature) point.getSignature()).getMethod().getParameterTypes();

try {

Method method = clazz[0].getMethod(methodName, parameterTypes);

if (method != null && method.isAnnotationPresent(DataSource.class)) {

DataSource data = method.getAnnotation(DataSource.class);

DynamicDataSourceHolder.putDataSource(data.value());

}

} catch (Exception e) {

logger.error(String.format("Choose DataSource error, method:%s, msg:%s", methodName, e.getMessage()));

}

}

public void after(JoinPoint point) {

DynamicDataSourceHolder.clearDataSource();

}

}

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xmlns:tx="http://springframework.org/schema/tx"

xmlns:aop="http://springframework.org/schema/aop"

xsi:schemaLocation="http://springframework.org/schema/beans http://springframework.org/schema/beans/spring-beans-4.1.xsd

http://springframework.org/schema/tx http://springframework.org/schema/tx/spring-tx-4.1.xsd

http://springframework.org/schema/aop http://springframework.org/schema/aop/spring-aop-4.1.xsd">

方案3

通过Mybatis的Plugin在业务层实现数据库读写分离,在MyBatis创建Statement对象前通过拦截器选择真正的数据源,在拦截器中根据方法名称不同(select、update、insert、delete)选择数据源。

优点:原有代码不变,支持多读,易扩展

缺点:

实现方式

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-07-19 15:40

* Desc: 创建Connection代理接口

*/

public interface ConnectionProxy extends Connection {

/**

* 根据传入的读写分离需要的key路由到正确的connection

* @param key 数据源标识

* @return

*/

Connection getTargetConnection(String key);

}

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.InvocationTargetException;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import java.sql.Connection;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.List;

import java.util.logging.Logger;

import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.jdbc.datasource.AbstractDataSource;

import org.springframework.jdbc.datasource.lookup.DataSourceLookup;

import org.springframework.jdbc.datasource.lookup.JndiDataSourceLookup;

import org.springframework.util.Assert;

public abstract class AbstractDynamicDataSourceProxy extends AbstractDataSource implements InitializingBean {

private List readDataSources;

private List resolvedReadDataSources;

private Object writeDataSource;

private DataSource resolvedWriteDataSource;

private int readDataSourcePollPattern = 0;

private int readDsSize;

private boolean defaultAutoCommit = true;

private int defaultTransactionIsolation = Connection.TRANSACTION_READ_COMMITTED;

public static final String READ = "read";

public static final String WRITE = "write";

private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();

@Override

public Connection getConnection() throws SQLException {

return (Connection) Proxy.newProxyInstance(

com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(),

new Class[] {com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class},

new RWConnectionInvocationHandler());

}

@Override

public Connection getConnection(String username, String password)

throws SQLException {

return (Connection) Proxy.newProxyInstance( com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(),

new Class[] {com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class},

new RWConnectionInvocationHandler(username,password));

}

public int getReadDsSize(){

return readDsSize;

}

public List getResolvedReadDataSources() {

return resolvedReadDataSources;

}

public void afterPropertiesSet() throws Exception {

if(writeDataSource == null){

throw new IllegalArgumentException("Property 'writeDataSource' is required");

}

this.resolvedWriteDataSource = resolveSpecifiedDataSource(writeDataSource);

resolvedReadDataSources = new ArrayList(readDataSources.size());

for(Object item : readDataSources){

resolvedReadDataSources.add(resolveSpecifiedDataSource(item));

}

readDsSize = readDataSources.size();

}

protected DataSource determineTargetDataSource(String key) {

Assert.notNull(this.resolvedReadDataSources, "DataSource router not initialized");

if(WRITE.equals(key)){

return resolvedWriteDataSource;

}else{

return loadReadDataSource();

}

}

public Logger getParentLogger() {

// NOOP Just ignore

return null;

}

/**

* 获取真实的data source

* @param dataSource (jndi | real data source)

* @return

* @throws IllegalArgumentException

*/

protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {

if (dataSource instanceof DataSource) {

return (DataSource) dataSource;

}

else if (dataSource instanceof String) {

return this.dataSourceLookup.getDataSource((String) dataSource);

}

else {

throw new IllegalArgumentException(

"Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);

}

}

protected abstract DataSource loadReadDataSource();

public void setReadDsSize(int readDsSize) {

this.readDsSize = readDsSize;

}

public List getReadDataSources() {

return readDataSources;

}

public void setReadDataSources(List readDataSources) {

this.readDataSources = readDataSources;

}

public Object getWriteDataSource() {

return writeDataSource;

}

public void setWriteDataSource(Object writeDataSource) {

this.writeDataSource = writeDataSource;

}

public void setResolvedReadDataSources(List resolvedReadDataSources) {

this.resolvedReadDataSources = resolvedReadDataSources;

}

public DataSource getResolvedWriteDataSource() {

return resolvedWriteDataSource;

}

public void setResolvedWriteDataSource(DataSource resolvedWriteDataSource) {

this.resolvedWriteDataSource = resolvedWriteDataSource;

}

public int getReadDataSourcePollPattern() {

return readDataSourcePollPattern;

}

public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {

this.readDataSourcePollPattern = readDataSourcePollPattern;

}

/**

* Invocation handler that defers fetching an actual JDBC Connection

* until first creation of a Statement.

*/

private class RWConnectionInvocationHandler implements InvocationHandler {

private String username;

private String password;

private Boolean readOnly = Boolean.FALSE;

private Integer transactionIsolation;

private Boolean autoCommit;

private boolean closed = false;

private Connection target;

public RWConnectionInvocationHandler() {

}

public RWConnectionInvocationHandler(String username, String password) {

this();

this.username = username;

this.password = password;

}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

// Invocation on ConnectionProxy interface coming in...

if (method.getName().equals("equals")) {

// We must avoid fetching a target Connection for "equals".

// Only consider equal when proxies are identical.

return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);

}

else if (method.getName().equals("hashCode")) {

// We must avoid fetching a target Connection for "hashCode",

// and we must return the same hash code even when the target

// Connection has been fetched: use hashCode of Connection proxy.

return new Integer(System.identityHashCode(proxy));

}

else if (method.getName().equals("getTargetConnection")) {

// Handle getTargetConnection method: return underlying connection.

return getTargetConnection(method,args);

}

if (!hasTargetConnection()) {

// No physical target Connection kept yet ->

// resolve transaction demarcation methods without fetching

// a physical JDBC Connection until absolutely necessary.

if (method.getName().equals("toString")) {

return "RW Routing DataSource Proxy";

}

else if (method.getName().equals("isReadOnly")) {

return this.readOnly;

}

else if (method.getName().equals("setReadOnly")) {

this.readOnly = (Boolean) args[0];

return null;

}

else if (method.getName().equals("getTransactionIsolation")) {

if (this.transactionIsolation != null) {

return this.transactionIsolation;

}

return defaultTransactionIsolation;

// Else fetch actual Connection and check there,

// because we didn't have a default specified.

}

else if (method.getName().equals("setTransactionIsolation")) {

this.transactionIsolation = (Integer) args[0];

return null;

}

else if (method.getName().equals("getAutoCommit")) {

if (this.autoCommit != null)

return this.autoCommit;

return defaultAutoCommit;

// Else fetch actual Connection and check there,

// because we didn't have a default specified.

}

else if (method.getName().equals("setAutoCommit")) {

this.autoCommit = (Boolean) args[0];

return null;

}

else if (method.getName().equals("commit")) {

// Ignore: no statements created yet.

return null;

}

else if (method.getName().equals("rollback")) {

// Ignore: no statements created yet.

return null;

}

else if (method.getName().equals("getWarnings")) {

return null;

}

else if (method.getName().equals("clearWarnings")) {

return null;

}

else if (method.getName().equals("isClosed")) {

return (this.closed ? Boolean.TRUE : Boolean.FALSE);

}

else if (method.getName().equals("close")) {

// Ignore: no target connection yet.

this.closed = true;

return null;

}

else if (this.closed) {

// Connection proxy closed, without ever having fetched a

// physical JDBC Connection: throw corresponding SQLException.

throw new SQLException("Illegal operation: connection is closed");

}

}

// Target Connection already fetched,

// or target Connection necessary for current operation ->

// invoke method on target connection.

try {

return method.invoke(target, args);

}

catch (InvocationTargetException ex) {

throw ex.getTargetException();

}

}

/**

* Return whether the proxy currently holds a target Connection.

*/

private boolean hasTargetConnection() {

return (this.target != null);

}

/**

* Return the target Connection, fetching it and initializing it if necessary.

*/

private Connection getTargetConnection(Method operation,Object[] args) throws SQLException {

if (this.target == null) {

String key = (String) args[0];

// No target Connection held -> fetch one.

if (logger.isDebugEnabled()) {

logger.debug("Connecting to database for operation '" + operation.getName() + "'");

}

// Fetch physical Connection from DataSource.

this.target = (this.username != null) ?

determineTargetDataSource(key).getConnection(this.username, this.password) :

determineTargetDataSource(key).getConnection();

// If we still lack default connection properties, check them now.

//checkDefaultConnectionProperties(this.target);

// Apply kept transaction settings, if any.

if (this.readOnly.booleanValue()) {

this.target.setReadOnly(this.readOnly.booleanValue());

}

if (this.transactionIsolation != null) {

this.target.setTransactionIsolation(this.transactionIsolation.intValue());

}

if (this.autoCommit != null && this.autoCommit.booleanValue() != this.target.getAutoCommit()) {

this.target.setAutoCommit(this.autoCommit.booleanValue());

}

}

else {

// Target Connection already held -> return it.

if (logger.isDebugEnabled()) {

logger.debug("Using existing database connection for operation '" + operation.getName() + "'");

}

}

return this.target;

}

}

}

import javax.sql.DataSource;

import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-07-19 16:04

* Desc:

*/

public class DynamicRoutingDataSourceProxy extends AbstractDynamicDataSourceProxy {

private AtomicLong counter = new AtomicLong(0);

private static final Long MAX_POOL = Long.MAX_VALUE;

private final Lock lock = new ReentrantLock();

@Override

protected DataSource loadReadDataSource() {

int index = 1;

if(getReadDataSourcePollPattern() == 1) {

//轮询方式

long currValue = counter.incrementAndGet();

if((currValue + 1) >= MAX_POOL) {

try {

lock.lock();

if((currValue + 1) >= MAX_POOL) {

counter.set(0);

}

} finally {

lock.unlock();

}

}

index = (int) (currValue % getReadDsSize());

} else {

//随机方式

index = ThreadLocalRandom.current().nextInt(0, getReadDsSize());

}

return getResolvedReadDataSources().get(index);

}

}

import org.apache.ibatis.executor.statement.RoutingStatementHandler;

import org.apache.ibatis.executor.statement.StatementHandler;

import org.apache.ibatis.mapping.MappedStatement;

import org.apache.ibatis.mapping.SqlCommandType;

import org.apache.ibatis.plugin.*;

import java.sql.Connection;

import java.util.Properties;

/**

* 拦截器

*/

@Intercepts({ @Signature(type = StatementHandler.class, method = "prepare", args = { Connection.class }) })

public class DynamicPlugin implements Interceptor {

public Object intercept(Invocation invocation) throws Throwable {

Connection conn = (Connection)invocation.getArgs()[0];

//如果是采用了我们代理,则路由数据源

if(conn instanceof com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy){

StatementHandler statementHandler = (StatementHandler) invocation

.getTarget();

MappedStatement mappedStatement = null;

if (statementHandler instanceof RoutingStatementHandler) {

StatementHandler delegate = (StatementHandler) ReflectionUtils

.getFieldValue(statementHandler, "delegate");

mappedStatement = (MappedStatement) ReflectionUtils.getFieldValue(

delegate, "mappedStatement");

} else {

mappedStatement = (MappedStatement) ReflectionUtils.getFieldValue(

statementHandler, "mappedStatement");

}

String key = AbstractDynamicDataSourceProxy.WRITE;

if(mappedStatement.getSqlCommandType() == SqlCommandType.SELECT){

key = AbstractDynamicDataSourceProxy.READ;

}else{

key = AbstractDynamicDataSourceProxy.WRITE;

}

ConnectionProxy connectionProxy = (ConnectionProxy)conn;

connectionProxy.getTargetConnection(key);

}

return invocation.proceed();

}

public Object plugin(Object target) {

return Plugin.wrap(target, this);

}

public void setProperties(Properties properties) {

//NOOP

}

}

import org.apache.ibatis.logging.Log;

import org.apache.ibatis.logging.LogFactory;

import java.lang.reflect.*;

public class ReflectionUtils {

private static final Log logger = LogFactory.getLog(ReflectionUtils.class);

/**

* 直接设置对象属性值,无视private/protected修饰符,不经过setter函数.

*/

public static void setFieldValue(final Object object, final String fieldName, final Object value) {

Field field = getDeclaredField(object, fieldName);

if (field == null)

throw new IllegalArgumentException("Could not find field [" + fieldName + "] on target [" + object + "]");

makeAccessible(field);

try {

field.set(object, value);

} catch (IllegalAccessException e) {

}

}

/**

* 直接读取对象属性值,无视private/protected修饰符,不经过getter函数.

*/

public static Object getFieldValue(final Object object, final String fieldName) {

Field field = getDeclaredField(object, fieldName);

if (field == null)

throw new IllegalArgumentException("Could not find field [" + fieldName + "] on target [" + object + "]");

makeAccessible(field);

Object result = null;

try {

result = field.get(object);

} catch (IllegalAccessException e) {

}

return result;

}

/**

* 直接调用对象方法,无视private/protected修饰符.

*/

public static Object invokeMethod(final Object object, final String methodName, final Class[] parameterTypes,

final Object[] parameters) throws InvocationTargetException {

Method method = getDeclaredMethod(object, methodName, parameterTypes);

if (method == null)

throw new IllegalArgumentException("Could not find method [" + methodName + "] on target [" + object + "]");

method.setAccessible(true);

try {

return method.invoke(object, parameters);

} catch (IllegalAccessException e) {

}

return null;

}

/**

* 循环向上转型,获取对象的DeclaredField.

*/

protected static Field getDeclaredField(final Object object, final String fieldName) {

for (Class superClass = object.getClass(); superClass != Object.class; superClass = superClass

.getSuperclass()) {

try {

return superClass.getDeclaredField(fieldName);

} catch (NoSuchFieldException e) {

}

}

return null;

}

/**

* 循环向上转型,获取对象的DeclaredField.

*/

protected static void makeAccessible(final Field field) {

if (!Modifier.isPublic(field.getModifiers()) || !Modifier.isPublic(field.getDeclaringClass().getModifiers())) {

field.setAccessible(true);

}

}

/**

* 循环向上转型,获取对象的DeclaredMethod.

*/

protected static Method getDeclaredMethod(Object object, String methodName, Class[] parameterTypes) {

for (Class superClass = object.getClass(); superClass != Object.class; superClass = superClass

.getSuperclass()) {

try {

return superClass.getDeclaredMethod(methodName, parameterTypes);

} catch (NoSuchMethodException e) {

}

}

return null;

}

/**

* 通过反射,获得Class定义中声明的父类的泛型参数的类型.

* eg.

* public UserDao extends HibernateDao

*

* @param clazz The class to introspect

* @return the first generic declaration, or Object.class if cannot be determined

*/

@SuppressWarnings("unchecked")

public static Class getSuperClassGenricType(final Class clazz) {

return getSuperClassGenricType(clazz, 0);

}

/**

* 通过反射,获得Class定义中声明的父类的泛型参数的类型.

* eg.

* public UserDao extends HibernateDao

*

* @param clazz The class to introspect

* @return the first generic declaration, or Object.class if cannot be determined

*/

@SuppressWarnings("unchecked")

public static Class getSuperClassGenricType(final Class clazz, final int index) {

Type genType = clazz.getGenericSuperclass();

if (!(genType instanceof ParameterizedType)) {

logger.warn(clazz.getSimpleName() + "'s superclass not ParameterizedType");

return Object.class;

}

Type[] params = ((ParameterizedType) genType).getActualTypeArguments();

if (index >= params.length || index < 0) {

logger.warn("Index: " + index + ", Size of " + clazz.getSimpleName() + "'s Parameterized Type: "

+ params.length);

return Object.class;

}

if (!(params[index] instanceof Class)) {

logger.warn(clazz.getSimpleName() + " not set the actual class on superclass generic parameter");

return Object.class;

}

return (Class) params[index];

}

/**

* 将反射时的checked exception转换为unchecked exception.

*/

public static IllegalArgumentException convertToUncheckedException(Exception e) {

if (e instanceof IllegalAccessException || e instanceof IllegalArgumentException

|| e instanceof NoSuchMethodException)

return new IllegalArgumentException("Refelction Exception.", e);

else

return new IllegalArgumentException(e);

}

}

"http://mybatis.org/dtd/mybatis-3-config.dtd">

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xmlns:tx="http://springframework.org/schema/tx"

xmlns:aop="http://springframework.org/schema/aop"

xsi:schemaLocation="http://springframework.org/schema/beans http://springframework.org/schema/beans/spring-beans-4.1.xsd

http://springframework.org/schema/tx

http://springframework.org/schema/tx/spring-tx-4.1.xsd

http://springframework.org/schema/aop http://springframework.org/schema/aop/spring-aop-4.1.xsd">

方案4

如果你的后台结构是spring+mybatis,可以通过spring的AbstractRoutingDataSource和mybatis Plugin拦截器实现非常友好的读写分离,原有代码不需要任何改变。推荐第四种方案

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import java.util.HashMap;

import java.util.Map;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-07-14 10:56

* Desc: 动态数据源实现读写分离

*/

public class DynamicDataSource extends AbstractRoutingDataSource {

private Object writeDataSource; //写数据源

private Object readDataSource; //读数据源

@Override

public void afterPropertiesSet() {

if (this.writeDataSource == null) {

throw new IllegalArgumentException("Property 'writeDataSource' is required");

}

setDefaultTargetDataSource(writeDataSource);

Map targetDataSources = new HashMap<>();

targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);

if(readDataSource != null) {

targetDataSources.put(DynamicDataSourceGlobal.READ.name(), readDataSource);

}

setTargetDataSources(targetDataSources);

super.afterPropertiesSet();

}

@Override

protected Object determineCurrentLookupKey() {

DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();

if(dynamicDataSourceGlobal == null

|| dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE) {

return DynamicDataSourceGlobal.WRITE.name();

}

return DynamicDataSourceGlobal.READ.name();

}

public void setWriteDataSource(Object writeDataSource) {

this.writeDataSource = writeDataSource;

}

public Object getWriteDataSource() {

return writeDataSource;

}

public Object getReadDataSource() {

return readDataSource;

}

public void setReadDataSource(Object readDataSource) {

this.readDataSource = readDataSource;

}

}

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-07-14 10:58

* Desc:

*/

public enum DynamicDataSourceGlobal {

READ, WRITE;

}

public final class DynamicDataSourceHolder {

private static final ThreadLocal holder = new ThreadLocal();

private DynamicDataSourceHolder() {

//

}

public static void putDataSource(DynamicDataSourceGlobal dataSource){

holder.set(dataSource);

}

public static DynamicDataSourceGlobal getDataSource(){

return holder.get();

}

public static void clearDataSource() {

holder.remove();

}

}

import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import org.springframework.transaction.TransactionDefinition;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-08-10 14:34

* Desc:

*/

public class DynamicDataSourceTransactionManager extends DataSourceTransactionManager {

/**

* 只读事务到读库,读写事务到写库

* @param transaction

* @param definition

*/

@Override

protected void doBegin(Object transaction, TransactionDefinition definition) {

//设置数据源

boolean readOnly = definition.isReadOnly();

if(readOnly) {

DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ);

} else {

DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE);

}

super.doBegin(transaction, definition);

}

/**

* 清理本地线程的数据源

* @param transaction

*/

@Override

protected void doCleanupAfterCompletion(Object transaction) {

super.doCleanupAfterCompletion(transaction);

DynamicDataSourceHolder.clearDataSource();

}

}

import org.apache.ibatis.executor.Executor;

import org.apache.ibatis.executor.keygen.SelectKeyGenerator;

import org.apache.ibatis.mapping.BoundSql;

import org.apache.ibatis.mapping.MappedStatement;

import org.apache.ibatis.mapping.SqlCommandType;

import org.apache.ibatis.plugin.*;

import org.apache.ibatis.session.ResultHandler;

import org.apache.ibatis.session.RowBounds;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.Locale;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ConcurrentHashMap;

/**

* Created by IDEA

* User: mashaohua

* Date: 2016-08-10 11:09

* Desc:

*/

@Intercepts({

@Signature(type = Executor.class, method = "update", args = {

MappedStatement.class, Object.class }),

@Signature(type = Executor.class, method = "query", args = {

MappedStatement.class, Object.class, RowBounds.class,

ResultHandler.class }) })

public class DynamicPlugin implements Interceptor {

protected static final Logger logger = LoggerFactory.getLogger(DynamicPlugin.class);

private static final String REGEX = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";

private static final Map cacheMap = new ConcurrentHashMap<>();

@Override

public Object intercept(Invocation invocation) throws Throwable {

boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();

if(!synchronizationActive) {

Object[] objects = invocation.getArgs();

MappedStatement ms = (MappedStatement) objects[0];

DynamicDataSourceGlobal dynamicDataSourceGlobal = null;

if((dynamicDataSourceGlobal = cacheMap.get(ms.getId())) == null) {

//读方法

if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {

//!selectKey 为自增id查询主键(SELECT LAST_INSERT_ID() )方法,使用主库

if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {

dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;

} else {

BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);

String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");

if(sql.matches(REGEX)) {

dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;

} else {

dynamicDataSourceGlobal = DynamicDataSourceGlobal.READ;

}

}

}else{

dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;

}

logger.warn("设置方法[{}] use [{}] Strategy, SqlCommandType [{}]..", ms.getId(), dynamicDataSourceGlobal.name(), ms.getSqlCommandType().name());

cacheMap.put(ms.getId(), dynamicDataSourceGlobal);

}

DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);

}

return invocation.proceed();

}

@Override

public Object plugin(Object target) {

if (target instanceof Executor) {

return Plugin.wrap(target, this);

} else {

return target;

}

}

@Override

public void setProperties(Properties properties) {

//

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微信小程序 页面跳转和数据传递实例详解
下一篇:Java分布式锁的三种实现方案
相关文章

 发表评论

暂时没有评论,来抢沙发吧~