SpringBoot实现分库分表

网友投稿 339 2022-08-27


SpringBoot实现分库分表

目录一、statementHandler对象的定义二、prepare方法1、首先prepare方法是用来编译SQL2、那就是之前说的那几个具体的StatementHandler对象3、parameterize方法4、query/update方法

方案:可以使用拦截器拦截mybatis框架,在执行SQL前对SQL语句根据路由字段进行分库分表操作,下例只做分表功能

@Intercepts:申明需要拦截的方法

拦截StatementHandler对象

一、statementHandler对象的定义

首先我们先来看看statementHandler接口的定义:

首先约定文中将的四大对象是指:executor, statementHandler,parameterHandler,resultHandler对象。

SimpleStatementHandler:对应我们JDBC中常用的Statement接口,用于简单SQL的处理;PreparedStatementHandler:对应JDBC中的PreparedStatement,预编译SQL的接口;CallableStatementHandler:对应JDBC中CallableStatement,用于执行存储过程相关的接口;RoutingStatementHandler:这个接口是以上三个接口的路由,没有实际操作,只是负责上面三个StatementHandler的创建及调用。

讲到statementHandler,毫无疑问它是我们四大对象最重要的一个,它的任务就是和数据库对话。在它这里会使用parameterHandler和ResultHandler对象为我们绑定SQL参数和组装最后的结果返回。

public interface StatementHandler {

Statement prepare(Connection connection)

throws SQLException;

void parameterize(Statement statement)

throws SQLException;

void batch(Statement statement)

throws SQLException;

int update(Statement statement)

throws SQLException;

List query(Statement statement, ResultHandler resultHandler)

throws SQLException;

BoundSql getBoundSql();

ParameterHandler getParameterHandler();

}

二、prepare方法

1、首先prepare方法是用来编译SQL

让我们看看它的源码实现。这里我们看到了BaseStatementHandler对prepare方法的实现

@Override

public Statement prepare(Connection connection) throws SQLException {

ErrorContext.instance().sql(boundSql.getSql());

Statement statement = null;

try {

statement = instantiateStatement(connection);

setStatementTimeout(statement);

setFetchSize(statement);

return statement;

} catch (SQLException e) {

closeStatement(statement);

throw e;

} catch (Exception e) {

closeStatement(statement);

throw new ExecutorException("Error preparing statement. Cause: " + e, e);

}

}

protected abstract Statement instantiateStatement(Connection connection) throws SQLException;

显然我们通过源码更加关注抽象方法instantiateStatement是做了什么事情。它依旧是一个抽象方法,那么它就有其实现类。

2、那就是之前说的那几个具体的StatementHandler对象

让我们看看PreparedStatementHandler:

@Override

protected Statement instantiateStatement(Connection connection) throws SQLException {

String sql = boundSql.getSql();

if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {

String[] keyColumnNames = mappedStatement.getKeyColumns();

if (keyColumnNames == null) {

return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS);

} else {

return connection.prepareStatement(sql, keyColumnNames);

}

} else if (mappedStatement.getResultSetType() != null) {

return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY);

} else {

return connection.prepareStatement(sql);

}

}

好这个方法非常简单,我们可以看到它主要是根据上下文来预编译SQL,这是我们还没有设置参数。设置参数的任务是交由,statement接口的parameterize方法来实现的。

3、parameterize方法

上面我们在prepare方法里面预编译了SQL。那么我们这个时候希望设置参数。在Statement中我们是使用parameterize方法进行设置参数的。

让我们看看PreparedStatementHandler中的parameterize方法:

@Override

public void parameterize(Statement statement) throws SQLException {

parameterHandler.setParameters((PreparedStatement) statement);

}

很显然这里很简单是通过parameterHandler来实现的,我们这篇文章只是停留在statementhandler的程度,等我们讲解parameterHandler的时候再来看它如何实现吧,期待一下吧。

4、query/update方法

我们用了prepare方法预编译了SQL,用了parameterize方法设置参数,那么我们接下来肯定是想执行SQL,而SQL无非是两种:

一种是进行查询——query,另外就是更新——update。

这些方法都很简单,让我们看看PreparedStatementHandler的实现:

@Override

public int update(Statement statement) throws SQLException {

PreparedStatement ps = (PreparedStatement) statement;

ps.execute();

int rows = ps.getUpdateCount();

Object parameterObject = boundSql.getParameterObject();

KeyGenerator keyGenerator = mappedStatement.getKeyGenerator();

keyGenerator.processAfter(executor, mappedStatement, ps, parameterObject);

return rows;

}

@Override

public List query(Statement statement, ResultHandler resultHandler) throws SQLException {

PreparedStatement ps = (PreparedStatement) statement;

ps.execute(); http://

return resultSetHandler. handleResultSets(ps);

}

例:动态替换SQL中@TableID标识符

package com.study.demo.interceptor;

import com.study.demo.exception.BaseException;

import org.apache.ibatis.executor.statement.StatementHandler;

import org.apache.ibatis.mapping.BoundSql;

import org.apache.ibatis.plugin.Interceptor;

import org.apache.ibatis.plugin.Intercepts;

import org.apache.ibatis.plugin.Invocation;

import org.apache.ibatis.plugin.Plugin;

import org.apache.ibatis.plugin.Signature;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import org.springframework.util.CollectionUtils;

import java.lang.reflect.Field;

import java.sql.Connection;

import java.util.Map;

import java.util.Properties;

import java.util.Set;

@Component

@Intercepts({

@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})

public class DynamicSQLInterceptor implements Interceptor {

private static final Logger LOGGER = LoggerFactory.getLogger(DynamicSQLInterceptor.class);

private static final String SHARD_TABLE_ID = "SHARD_TABLE_ID";

private static final String DEFAULT_TABLE_ID = "000";

@Override

@SuppressWarnings("unchecked")

public Object intercept(Invocation invocation) throws Throwable {

LOGGER.info("DynamicSQLInterceptor.intercept() exec.");

StatementHandler statementHandler = (StatementHandler) invocation.getTarget();

Object parameter = statementHandler.getParameterHandler().getParameterObject();

Map params = (Map)parameter;

if(CollectionUtils.isEmpty(params)){

throw new BaseException("SQL: 路由字段不能为空!");

}

String tableId = DEFAULT_TABLE_ID;

Set keySet = params.keySet();

for (String key : keySet) {

if (SHARD_TABLE_ID.equals(key)) {

tableId = String.valueOf(params.get(key));

}

}

BoundSql boundSql = statementHandler.getBoundSql();

//获取到原始sql语句

String sql = boundSql.getSql();

String newSql = sql.replaceAll("@TableID", tableId);

LOGGER.debug("[DynamicSQLInterceptor] Sql:{}", newSql);

//通过反射修改sql语句

Field field = boundSql.getClass().getDeclaredField("sql");

field.setAccessible(true);

field.set(boundSql, newSql);

return invocation.proceed();

}

@Override

public Object plugin(Object target) {

//只拦截Executor对象,减少目标被代理的次数

if (target instanceof StatementHandler) {

return Plugin.wrap(target, this);

} else {

return target;

}

}

@Override

public void setProperties(Properties properties) {

LOGGER.debug("[DynamicSQLInterceptor] SetProperties");

}

}

示例SQL:

SELECT * FROM ST_CLASS_@TableID WHERE ID = #{id}

service层示例:

@Override

public Objcet queryByPrimaryKey(String id) {

Map params = DbShardUtils.shardDBParamMap(id);

params.put("id", id);

return testDao.queryByPrimaryKey(params);

}

dao层示例:

@Repository

public interface TestDao {

Object queryByPrimaryKey(Map params);

}

package com.study.demo.utils;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.HashMap;

import java.util.Map;

/**

* 分库分表工具类

* 返回Map, 含有key:SHARD_TABLE_ID

*/

public class DbShardUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(DbShardUtils.class);

private static final String SHARD_TABLE_ID = "SHARD_TABLE_ID";

/**

* 私有构造函数

*/

private DbShardUtils() {

}

public static Map shardDBParamMap(String id){

if (StringUtils.isBlank(id)) {

LOGGER.error("sharding id is null");

}

Map paramMap = new HashMap<>();

paramMap.put(SHARD_TABLE_ID, rout(id));

return paramMap;

}

private static String rout(String id) {

// 测试

return "000";

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python小记——None和False(python的none什么意思)
下一篇:Python小记——怎么写好一个Python函数?(用python写函数)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~