springboot实现以代码的方式配置sharding

网友投稿 629 2022-09-12


springboot实现以代码的方式配置sharding

目录关于依赖shardingsphere-jdbc-core-spring-boot-startershardingsphere-jdbc-core数据源DataSource原DataSourceShardingJdbcDataSource完整的ShardingJdbcDataSource配置分表策略主要的类其他的分表配置类groovy行表达式说明properties配置Sharding-jdbc的坑结语

多数项目可能是已经运行了一段时间,才开始使用sharding-jdbc。

本教程就如何配置sharding-jdbc,才能使代码改动最少,对功能影响最少(如果已经做了垂直分表,只有一部分子项目需要水平分表)给出一个简单方案。

关于依赖

shardingsphere-jdbc-core-spring-boot-starter

官方给出了Spring Boot Starter配置

org.apache.shardingsphere

shardingsphere-jdbc-core-spring-boot-starter

${shardingsphere.version}

但是基于已有项目,添加shardingsphere自动配置是很恶心的事

为什么配置了某个数据连接池的spring-boot-starter(比如druid)和 shardingsphere-jdbc-spring-boot-starter 时,系统启动会报错?

回答:

1. 因为数据连接池的starter(比如druid)可能会先加载并且其创建一个默认数据源,这将会使得 ShardingSphere‐JDBC 创建数据源时发生冲突。

2. 解决办法为,去掉数据连接池的starter 即可,sharing‐jdbc 自己会创建数据连接池。

一般项目已经有自己的DataSource了,如果使用shardingsphere-jdbc的自动配置,就必须舍弃原有的DataSource。

shardingsphere-jdbc-core

为了不放弃原有的DataSource配置,我们只引入shardingsphere-jdbc-core依赖

org.apache.shardingsphere

sharding-jdbc-core

4.1.1

如果只水平分表,只支持mysql,可以排除一些无用的依赖

org.apache.shardingsphere

sharding-jdbc-core

4.1.1

org.apache.shardingsphere

shardingsphere-sql-parser-postgresql

org.apache.shardingsphere

shardingsphere-sql-parser-oracle

org.apache.shardingsphere

shardingsphere-sql-parser-sqlserver

org.apache.shardingsphere

encrypt-core-rewrite

org.apache.shardingsphere

shadow-core-rewrite

org.apache.shardingsphere

encrypt-core-merge

com.zaxxer

HikariCP

org.apache.commons

commons-dbcp2

commons-pool

commons-pool

com.h2database

h2

mysql

mysql-connector-java

org.postgresql

postgresql

com.microsoft.sqlserver

mssql-jdbc

数据源DataSource

原DataSource

以Druid为例,原配置为

package com.xxx.common.autoConfiguration;

import java.util.ArrayList;

import java.util.List;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.boot.web.servlet.FilterRegistrationBean;

import org.springframework.boot.web.servlet.ServletRegistrationBean;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import com.alibaba.druid.filter.Filter;

import com.alibaba.druid.filter.logging.Slf4jLogFilter;

import com.alibaba.druid.filter.stat.StatFilter;

import com.alibaba.druid.pool.DruidDataSource;

import com.alibaba.druid.support.http.StatViewServlet;

import com.alibaba.druid.support.http.WebStatFilter;

import com.alibaba.druid.wall.WallConfig;

import com.alibaba.druid.wall.WallFilter;

import lombok.extern.slf4j.Slf4j;

/**

* @ClassName: DruidConfiguration

* @Description: Druid连接池配置

*/

@Configuration

@Slf4j

public class DruidConfiguration {

@Value("${spring.datasource.driver-class-name}")

private String driver;

@Value("${spring.datasource.url}")

private String url;

@Value("${spring.datasource.username}")

private String username;

@Value("${spring.datasource.password}")

private String password;

@Value("${datasource.druid.initialsize}")

private Integer druid_initialsize = 0;

@Value("${datasource.druid.maxactive}")

private Integer druid_maxactive = 20;

@Value("${datasource.druid.minidle}")

private Integer druid_minidle = 0;

@Value("${datasource.druid.maxwait}")

private Integer druid_maxwait = 30000;

@Bean

public ServletRegistrationBean druidServlet() {

ServletRegistrationBean reg = new ServletRegistrationBean();

reg.setServlet(new StatViewServlet());

reg.addUrlMappings("/druid/*");

reg.addInitParameter("loginUsername", "root");

reg.addInitParameter("loginPassword", "root!@#");

//reg.addInitParameter("logSlowSql", "");

return reg;

}

/**

*

* @Title: druidDataSource

* @Description: 数据库源Bean

* @param @return 参数说明

* @return DataSource 返回类型

* @throws

*/

@Bean

public DataSource druidDataSource() {

// 数据源

DruidDataSource druidDataSource = new DruidDataSource();

druidDataSource.setDriverClassName(driver); // 驱动

druidDataSource.setUrl(url); // 数据库连接地址

druidDataSource.setUsername(username); // 数据库用户名

druidDataSource.setPassword(password); // 数据库密码

druidDataSource.setInitialSize(druid_initialsize);// 初始化连接大小

druidDataSource.setMaxActive(druid_maxactive); // 连接池最大使用连接数量

druidDataSource.setMinIdle(druid_minidle); // 连接池最小空闲

druidDataSource.setMaxWait(druid_maxwait); // 获取连接最大等待时间

// 打开PSCache,并且指定每个连接上PSCache的大小

druidDataSource.setPoolPreparedStatements(false);

druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(33);

//druidDataSource.setValidationQuery("SELECT 1"); // 用来检测连接是否有效的sql

druidDataSource.setTestOnBorrow(false); // 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。

druidDataSource.setTestOnReturn(false); // 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能

druidDataSource.setTestWhileIdle(false); // 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效

druidDataSource.setTimeBetweenLogStatsMillis(60000); // 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒

druidDataSource.setMinEvictableIdleTimeMillis(1800000); // 配置一个连接在池中最小生存的时间,单位是毫秒

// 当程序存在缺陷时,申请的连接忘记关闭,这时候,就存在连接泄漏

// 配置removeAbandoned对性能会有一些影响,建议怀疑存在泄漏之后再打开。在上面的配置中,如果连接超过30分钟未关闭,就会被强行回收,并且日志记录连接申请时的调用堆栈。

druidDataSource.setRemoveAbandoned(false); // 打开removeAbandoned功能

druidDataSource.setRemoveAbandonedTimeout(1800); // 1800秒,也就是30分钟

druidDataSource.setLogAbandoned(false); // 关闭abanded连接时输出错误日志

// 过滤器

List filters = new ArrayList();

filters.add(this.getStatFilter()); // 监控

//filters.add(this.getSlf4jLogFilter()); // 日志

filters.add(this.getWallFilter()); // 防火墙

druidDataSource.setProxyFilters(filters);

log.info("连接池配置信息:"+druidDataSource.getUrl());

return druidDataSource;

}

@Bean

public FilterRegistrationBean filterRegistrationBean() {

FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();

WebStatFilter webStatFilter = new WebStatFilter();

filterRegistrationBean.setFilter(webStatFilter);

filterRegistrationBean.addUrlPatterns("/*");

filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");

return filterRegistrationBean;

}

/**

*

* @Title: getStatFilter

* @Description: 监控过滤器

* @param @return 参数说明

* @return StatFilter 返回类型

* @throws

*/

public StatFilter getStatFilter(){

StatFilter sFilter = new StatFilter();

//sFilter.setSlowSqlMillis(2000); // 慢sql,毫秒时间

sFilter.setLogSlowSql(false); // 慢sql日志

sFilter.setMergeSql(true); // sql合并优化处理

return sFilter;

}

/**

*

* @Title: getSlf4jLogFilter

* @Description: 监控日志过滤器

* @param @return 参数说明

* @return Slf4jLogFilter 返回类型

* @throws

*/

public Slf4jLogFilter getSlf4jLogFilter(){

Slf4jLogFilter slFilter = new Slf4jLogFilter();

slFilter.setResultSetLogEnabled(false);

slFilter.setStatementExecutableSqlLogEnable(false);

return slFilter;

}

/**

*

* @Title: getWallFilter

* @Description: 防火墙过滤器

* @param @return 参数说明

* @return WallFilter 返回类型

* @throws

*/

public WallFilter getWallFilter(){

WallFilter wFilter = new WallFilter();

wFilter.setDbType("mysql");

wFilter.setConfig(this.getWallConfig());

wFilter.setLogViolation(true); // 对被认为是攻击的SQL进行LOG.error输出

wFilter.setThrowException(true); // 对被认为是攻击的SQL抛出SQLExcepton

return wFilter;

}

/**

*

* @Title: getWallConfig

* @Description: 数据防火墙配置

* @param @return 参数说明

* @return WallConfig 返回类型

* @throws

*/

public WallConfig getWallConfig(){

WallConfig wConfig = new WallConfig();

wConfig.setDir("META-INF/druid/wall/mysql"); // 指定配置装载的目录

// 拦截配置-语句

wConfig.setTruncateAllow(false); // truncate语句是危险,缺省打开,若需要自行关闭

wConfig.setCreateTableAllow(true); // 是否允许创建表

wConfig.setAlterTableAllow(false); // 是否允许执行Alter Table语句

wConfig.setDropTableAllow(false); // 是否允许修改表

// 其他拦截配置

wConfig.setStrictSyntaxCheck(true); // 是否进行严格的语法检测,Druid SQL Parser在某些场景不能覆盖所有的SQL语法,出现解析SQL出错,可以临时把这个选项设置为false,同时把SQL反馈给Druid的开发者

wConfig.setConditionOpBitwseAllow(true); // 查询条件中是否允许有"&"、"~"、"|"、"^"运算符。

wConfig.setMinusAllow(true); // 是否允许SELECT * FROM A MINUS SELECT * FROM B这样的语句

wConfig.setIntersectAllow(true); // 是否允许SELECT * FROM A INTERSECT SELECT * FROM B这样的语句

//wConfig.setMetadataAllow(false); // 是否允许调用Connection.getMetadata方法,这个方法调用会暴露数据库的表信息

return wConfig;

}

}

可见,如果用自动配置的方式放弃这些原有的配置风险有多大

怎么改呢?

ShardingJdbcDataSource

第一步,创建一个interface,用以加载自定义的分表策略

可以在各个子项目中创建bean,实现此接口

public interface ShardingRuleSupport {

void configRule(ShardingRuleConfiguration shardingRuleConfig);

}

第二步,在DruidConfiguration.class中注入所有的ShardingRuleSupport

@Autowired(required = false)

private List shardingRuleSupport;

第三步,创建sharding-jdbc分表数据源

//包装Druid数据源

Map dataSourceMap = new HashMap<>();

//自定义一个名称为ds0的数据源名称,包装原有的Druid数据源,还可以再定义多个数据源

//因为只分表不分库,所有定义一个数据源就够了

dataSourceMap.put("ds0", druidDataSource);

//加载分表配置

ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();

//要加载所有的ShardingRuleSupport实现bean,所以用for循环加载

for (ShardingRuleSupport support : shardingRuleSupport) {

support.configRule(shardingRuleConfig);

}

//加载其他配置

Properties properties = new Properties();

//由于未使用starter的自动装配,所以手动设置,是否显示分表sql

properties.put("sql.show", sqlShow);

//返回ShardingDataSource包装的数据源

return ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, properties);

完整的ShardingJdbcDataSource配置

package com.xxx.common.autoConfiguration;

import java.util.ArrayList;

import java.util.List;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.boot.web.servlet.FilterRegistrationBean;

import org.springframework.boot.web.servlet.ServletRegistrationBean;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import com.alibaba.druid.filter.Filter;

import com.alibaba.druid.filter.logging.Slf4jLogFilter;

import com.alibaba.druid.filter.stat.StatFilter;

import com.alibaba.druid.pool.DruidDataSource;

import com.alibaba.druid.support.http.StatViewServlet;

import com.alibaba.druid.support.http.WebStatFilter;

import com.alibaba.druid.wall.WallConfig;

import com.alibaba.druid.wall.WallFilter;

import lombok.extern.slf4j.Slf4j;

/**

* @ClassName: DruidConfiguration

* @Description: Druid连接池配置

*/

@Configuration

@Slf4j

public class DruidConfiguration {

@Value("${spring.datasource.driver-class-name}")

private String driver;

@Value("${spring.datasource.url}")

private String url;

@Value("${spring.datasource.username}")

private String username;

@Value("${spring.datasource.password}")

private String password;

@Value("${datasource.druid.initialsize}")

private Integer druid_initialsize = 0;

@Value("${datasource.druid.maxactive}")

private Integer druid_maxactive = 20;

@Value("${datasource.druid.minidle}")

private Integer druid_minidle = 0;

@Value("${datasource.druid.maxwait}")

private Integer druid_maxwait = 30000;

/**

* 默认不显示分表SQL

*/

@Value("${spring.shardingsphere.props.sql.show:false}")

private boolean sqlShow;

@Autowired(required = false)

private List shardingRuleSupport;

@Bean

public ServletRegistrationBean druidServlet() {

ServletRegistrationBean reg = new ServletRegistrationBean();

reg.setServlet(new StatViewServlet());

reg.addUrlMappings("/druid/*");

reg.addInitParameter("loginUsername", "root");

reg.addInitParameter("loginPassword", "root!@#");

//reg.addInitParameter("logSlowSql", "");

return reg;

}

/**

*

* @Title: druidDataSource

* @Description: 数据库源Bean

* @param @return 参数说明

* @return DataSource 返回类型

* @throws

*/

@Bean

public DataSource druidDataSource() {

// 数据源

DruidDataSource druidDataSource = new DruidDataSource();

druidDataSource.setDriverClassName(driver); // 驱动

druidDataSource.setUrl(url); // 数据库连接地址

druidDataSource.setUsername(username); // 数据库用户名

druidDataSource.setPassword(password); // 数据库密码

druidDataSource.setInitialSize(druid_initialsize);// 初始化连接大小

druidDataSource.setMaxActive(druid_maxactive); // 连接池最大使用连接数量

druidDataSource.setMinIdle(druid_minidle); // 连接池最小空闲

druidDataSource.setMaxWait(druid_maxwait); // 获取连接最大等待时间

// 打开PSCache,并且指定每个连接上PSCache的大小

druidDataSource.setPoolPreparedStatements(false);

druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(33);

//druidDataSource.setValidationQuery("SELECT 1"); // 用来检测连接是否有效的sql

druidDataSource.setTestOnBorrow(false); // 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。

druidDataSource.setTestOnReturn(false); // 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能

druidDataSource.setTestWhileIdle(false); // 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效

druidDataSource.setTimeBetweenLogStatsMillis(60000); // 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒

druidDataSource.setMinEvictableIdleTimeMillis(1800000); // 配置一个连接在池中最小生存的时间,单位是毫秒

// 当程序存在缺陷时,申请的连接忘记关闭,这时候,就存在连接泄漏

// 配置removeAbandoned对性能会有一些影响,建议怀疑存在泄漏之后再打开。在上面的配置中,如果连接超过30分钟未关闭,就会被强行回收,并且日志记录连接申请时的调用堆栈。

druidDataSource.setRemoveAbandoned(false); // 打开removeAbandoned功能

druidDataSource.setRemoveAbandonedTimeout(1800); // 1800秒,也就是30分钟

druidDataSource.setLogAbandoned(false); // 关闭abanded连接时输出错误日志

// 过滤器

List filters = new ArrayList();

filters.add(this.getStatFilter()); // 监控

//filters.add(this.getSlf4jLogFilter()); // 日志

filters.add(this.getWallFilter()); // 防火墙

druidDataSource.setProxyFilters(filters);

log.info("连接池配置信息:"+druidDataSource.getUrl());

if (shardingRuleSupport == null || shardingRuleSupport.isEmpty()) {

log.info("............分表配置为空,使用默认的数据源............");

return druidDataSource;

}

log.info("++++++++++++加载sharding jdbc配置++++++++++++");

//包装Druid数据源

Map dataSourceMap = new HashMap<>();

//自定义一个名称为ds0的数据源名称,包装原有的Druid数据源,还可以再定义多个数据源

//因为只分表不分库,所有定义一个数据源就够了

dataSourceMap.put("ds0", druidDataSource);

//加载分表配置

ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();

//要加载所有的ShardingRuleSupport实现bean,所以用for循环加载

for (ShardingRuleSupport support : shardingRuleSupport) {

support.configRule(shardingRuleConfig);

}

//加载其他配置

Properties properties = new Properties();

//由于未使用starter的自动装配,所以手动设置,是否显示分表sql

properties.put("sql.show", sqlShow);

//返回ShardingDataSource包装的数据源

return ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, properties);

}

@Bean

public FilterRegistrationBean filterRegistrationBean() {

FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();

WebStatFilter webStatFilter = new WebStatFilter();

filterRegistrationBean.setFilter(webStatFilter);

filterRegistrationBean.addUrlPatterns("/*");

filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");

return filterRegistrationBean;

}

/**

*

* @Title: getStatFilter

* @Description: 监控过滤器

* @param @return 参数说明

* @return StatFilter 返回类型

* @throws

*/

public StatFilter getStatFilter(){

StatFilter sFilter = new StatFilter();

//sFilter.setSlowSqlMillis(2000); // 慢sql,毫秒时间

sFilter.setLogSlowSql(false); // 慢sql日志

sFilter.setMergeSql(true); // sql合并优化处理

return sFilter;

}

/**

*

* @Title: getSlf4jLogFilter

* @Description: 监控日志过滤器

* @param @return 参数说明

* @return Slf4jLogFilter 返回类型

* @throws

*/

public Slf4jLogFilter getSlf4jLogFilter(){

Slf4jLogFilter slFilter = new Slf4jLogFilter();

slFilter.setResultSetLogEnabled(false);

slFilter.setStatementExecutableSqlLogEnable(false);

return slFilter;

}

/**

*

* @Title: getWallFilter

* @Description: 防火墙过滤器

* @param @return 参数说明

* @return WallFilter 返回类型

* @throws

*/

public WallFilter getWallFilter(){

WallFilter wFilter = new WallFilter();

wFilter.setDbType("mysql");

wFilter.setConfig(this.getWallConfig());

wFilter.setLogViolation(true); // 对被认为是攻击的SQL进行LOG.error输出

wFilter.setThrowException(true); // 对被认为是攻击的SQL抛出SQLExcepton

return wFilter;

}

/**

*

* @Title: getWallConfig

* @Description: 数据防火墙配置

* @param @return 参数说明

* @return WallConfig 返回类型

* @throws

*/

public WallConfig getWallConfig(){

WallConfig wConfig = new WallConfig();

wConfig.setDir("META-INF/druid/wall/mysql"); // 指定配置装载的目录

// 拦截配置-语句

wConfig.setTruncateAllow(false); // truncate语句是危险,缺省打开,若需要自行关闭

wConfig.setCreateTableAllow(true); // 是否允许创建表

wConfig.setAlterTableAllow(false); // 是否允许执行Alter Table语句

wConfig.setDropTableAllow(false); // 是否允许修改表

// 其他拦截配置

wConfig.setStrictSyntaxCheck(true); // 是否进行严格的语法检测,Druid SQL Parser在某些场景不能覆盖所有的SQL语法,出现解析SQL出错,可以临时把这个选项设置为false,同时把SQL反馈给Druid的开发者

wConfig.setConditionOpBitwseAllow(true); // 查询条件中是否允许有"&"、"~"、"|"、"^"运算符。

wConfig.setMinusAllow(true); // 是否允许SELECT * FROM A MINUS SELECT * FROM B这样的语句

wConfig.setIntersectAllow(true); // 是否允许SELECT * FROM A INTERSECT SELECT * FROM B这样的语句

//wConfig.setMetadataAllow(false); // 是否允许调用Connection.getMetadata方法,这个方法调用会暴露数据库的表信息

return wConfig;

}

}

分表策略

主要的类

创建几个ShardingRuleSupport接口的实现Bean

@Component

public class DefaultShardingRuleAdapter implements ShardingRuleSupport {

@Override

public void configRule(ShardingRuleConfiguration shardingRuleConfiguration) {

Collection tableRuleConfigs = shardingRuleConfiguration.getTableRuleConfigs();

TableRuleConfiguration ruleConfig1 = new TableRuleConfiguration("table_one", "ds0.table_one_$->{0..9}");

ComplexShardingStrategyConfiguration strategyConfig1 = new ComplexShardingStrategyConfiguration("column_id", new MyDefaultShardingAlgorithm());

ruleConfig1.setTableShardingStrategyConfig(strategyConfig1);

tableRuleConfigs.add(ruleConfig1);

TableRuleConfiguration ruleConfig2 = new TableRuleConfiguration("table_two", "ds0.table_two_$->{0..9}");

ComplexShardingStrategyConfiguration strategyConfig2 = new ComplexShttp://hardingStrategyConfiguration("column_id", new MyDefaultShardingAlgorithm());

ruleConfig2.setTableShardingStrategyConfig(strategyConfig2);

tableRuleConfigs.add(ruleConfig2);

}

}

@Component

public class CustomShardingRuleAdapter implements ShardingRuleSupport {

@Override

public void configRule(ShardingRuleConfiguration shardingRuleConfiguration) {

Collection tableRuleConfigs = shardingRuleConfiguration.getTableRuleConfigs();

TableRuleConfiguration ruleConfig1 = new TableRuleConfiguration(MyCustomShardingUtil.LOGIC_TABLE_NAME, MyCustomShardingUtil.ACTUAL_DATA_NODES);

ComplexShardingStrategyConfiguration strategyConfig1 = new ComplexShardingStrategyConfiguration(MyCustomShardingUtil.SHARDING_COLUMNS, new MyCustomShardingAlgorithm());

ruleConfig1.setTableShardingStrategyConfig(strategyConfig1);

tableRuleConfigs.add(ruleConfig1);

}

}

其他的分表配置类

public class MyDefaultShardingAlgorithm implements ComplexKeysShardingAlgorithm {

public String getShardingKey () {

return "column_id";

}

@Override

public Collection doSharding(Collection availableTargetNames, ComplexKeysShardingValue shardingValue) {

Collection col = new ArrayList<>();

String logicTableName = shardingValue.getLogicTableName() + "_";

Map availableTargetNameMap = new HashMap<>();

for (String targetName : availableTargetNameMap) {

String endStr = StringUtils.substringAfter(targetName, logicTableName);

availableTargetNameMap.put(endStr, targetName);

}

int size = availableTargetNames.size();

//=,in

Collection shardingColumnValues = shardingValue.getColumnNameAndShardingValuesMap().get(this.getShardingKey());

if (shardingColumnValues != null) {

for (String shardingColumnValue : shardingColumnValues) {

String modStr = Integer.toString(Math.abs(shardingColumnValue .hashCode()) % size);

String actualTableName = availableTargetNameMap.get(modStr);

if (StringUtils.isNotEmpty(actualTableName)) {

col.add(actualTableName);

}

}

}

//between and

//shardingValue.getColumnNameAndRangeValuesMap().get(this.getShardingKey());

... ...

//如果分表列不是有序的,则between and无意义,没有必要实现

return col;

}

}

public class MyCustomShardingAlgorithm extends MyDefaultShardingAlgorithm implements ComplexKeysShardingAlgorithm {

@Override

public String getShardingKey () {

return MyCustomShardingUtil.SHARDING_COLUMNS;

}

@Override

public Collection doSharding(Collection availableTargetNames, ComplexKeysShardingValue shardingValue) {

Collection col = new ArrayList<>();

String logicTableName = shardingValue.getLogicTableName() + "_";

Map availableTargetNameMap = new HashMap<>();

for (String targetName : availableTargetNameMap) {

String endStr = StringUtils.substringAfter(targetName, logicTableName);

availableTargetNameMap.put(endStr, targetName);

}

Map specialActualTableNameMap = MyCustomShardingUtil.getSpecialActualTableNameMap();

int count = (int) specialActualTableNameMap.values().stream().distinct().count();

int size = availableTargetNames.size() - count;

//=,in

Collection shardingColumnValues = shardingValue.getColumnNameAndShardingValuesMap().get(this.getShardingKey());

if (shardingColumnValues != null) {

for (String shardingColumnValue : shardingColumnValues) {

String specialActualTableName = specialActualTableNameMap.get(shardingColumnValue);

if (StringUtils.isNotEmpty(specialActualTableName)) {

col.add(specialActualTableName);

continue;

}

String modStr = Integer.toString(Math.abs(shardingColumnValue .hashCode()) % size);

String actualTableName = availableTargetNameMap.get(modStr);

if (StringUtils.isNotEmpty(actualTableName)) {

col.add(actualTableName);

}

}

}

//between and

//shardingValue.getColumnNameAndRangeValuesMap().get(this.getShardingKey());

... ...

//如果分表列不是有序的,则between and无意义,没有必要实现

return col;

}

}

@Component

public class MyCustomShardingUtil {

/**

* 逻辑表名

*/

public static final String LOGIC_TABLE_NAME = "table_three";

/**

* 分片字段

*/

public static final String SHARDING_COLUMNS = "column_name";

/**

* 添加指定分片表的后缀

*/

private static final String[] SPECIAL_NODES = new String[]{"0sp", "1sp"};

// ds0.table_three_$->{((0..9).collect{t -> t.toString()} << ['0sp','1sp']).flatten()}

public static final String ACTUAL_DATA_NODES = "ds0." + LOGIC_TABLE_NAME + "_$->{((0..9).collect{t -> t.toString()} << "

+ "['" + SPECIAL_NODES[0] + "','" + SPECIAL_NODES[1] + "']"

+ ").flatten()}";

private static final List specialList0 = new ArrayList<>();

@Value("${special.table_three.sp0.ids:null}")

private void setSpecialList0(String ids) {

if (StringUtils.isBlank(ids)) {

return;

}

String[] idSplit = StringUtils.split(ids, ",");

for (String id : idSplit) {

String trimId = StringUtils.trim(id);

if (StringUtils.isEmpty(trimId)) {

continue;

}

specialList0.add(trimId);

}

}

private static final List specialList1 = new ArrayList<>();

@Value("${special.table_three.sp1.ids:null}")

private void setSpecialList1(String ids) {

if (StringUtils.isBlank(ids)) {

return;

}

String[] idSplit = StringUtils.split(ids, ",");

for (String id : idSplit) {

String trimId = StringUtils.trim(id);

if (StringUtils.isEmpty(trimId)) {

continue;

}

specialList1.add(trimId);

}

}

private static class SpecialActualTableNameHolder {

private static volatile Map specialActualTableNameMap = new HashMap<>();

static {

for (String specialId : specialList0) {

specialActualTableNameMap.put(specialId, LOGIC_TABLE_NAME + "_" + SPECIAL_NODES[0]);

}

for (String specialId : specialList1) {

specialActualTableNameMap.put(specialId, LOGIC_TABLE_NAME + "_" + SPECIAL_NODES[1]);

}

}

}

/**

* @return 指定ID的表名映射

*/

public static Map getSpecialActualTableNameMap() {

return SpecialActualTableNameHolder.specialActualTableNameMap;

}

}

ShardingAlgorithm接口的子接口除了ComplexKeysShardingAlgorithm,还有HintShardingAlgorithm,PreciseShardingAlgorithm,RangeShardingAlgorithm;本教程使用了更通用的ComplexKeysShardingAlgorithm接口。

配置TableRuleConfiguration类时,使用了两个参数的构造器

public TableRuleConfiguration(String logicTable, String actualDataNodes) {}

TableRuleConfiguration类还有一个参数的的构造器,没有实际数据节点,是给广播表用的

public TableRuleConfiguration(String logicTable) {}

groovy行表达式说明

ds0.table_three_$->{((0…9).collect{t -> t.toString()} << [‘0sp',‘1sp']).flatten()}

sharding-jdbc的groovy行表达式支持$->{…}或${…},为了避免与spring的占位符混淆,官方推荐使用$->{…}

(0..9) 获得0到9的集合

(0..9).collect{t -> t.toString()} 数值0到9的集合转换成字符串0到9的数组

(0..9).collect{t -> t.toString()} << ['0sp','1sp'] 字符串0到9的数组合并['0sp','1sp']数组,结果为 ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ['0sp','1sp']]

flatten() 扁平化数组,结果为 ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0sp', '1sp']

properties配置

#是否显示分表SQL,默认为false

spring.shardingsphere.props.sql.show=true

#指定哪些列值入指定的分片表,多个列值以“,”分隔

#column_name为9997,9998,9999的记录存入表table_three_0sp中

#column_name为1111,2222,3333,4444,5555的记录存入表table_three_1sp中

#其余的值哈希取模后,存入对应的table_three_模数表中

special.table_three.sp0.ids=9997,9998,9999

special.table_three.sp1.ids=1111,2222,3333,4444,5555

Sharding-jdbc的坑

任何SQL,只要select子句中包含动态参数,则抛出类型强转异常

禁止修改分片键,如果update的set子句中存在分片键,则不能执行sql

结语

至此,简单的单表分表策略就配置完成了

代码没有好坏,合适的就是最好的


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:网络Ping测试程序(怎么测试网络ping)
下一篇:华为网络设备与基础配置(华为网络设备与基础配置不匹配)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~