Flink流处理引擎零基础速通之数据的抽取篇

网友投稿 604 2022-07-29


目录一、CDC二、常见CDC的比较三、Flink CDC四、Flink CDC支持的数据库五、阿里实现的FlinkCDC使用示例依赖引入基于table基于sql总结

一、CDC

CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC 。但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术。

二、常见CDC的比较

常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。

DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求。这种作业方式无法保证数据的一致性,实时性也较差。Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术。这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据。

三、Flink CDC

2020年 Flink cdc 首次在 Flink forward 大会上官宣, 由 Jark Wu & Qingsheng Ren 两位大佬提出。

Flink CDC connector 可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。Flink CDC connector 可以直接在Flink中以非约束模式(流)使用,而不需要使用类似 kafka 之类的中间件中转数据。

四、Flink CDC支持的数据库

PS:

Flink CDC 2.2才新增OceanBase,PolarDB-X,SqlServer,TiDB 四种数据源接入,均支持全量和增量一体化同步。

截止到目前FlinkCDC已经支持12+数据源。

五、阿里实现的FlinkCDC使用示例

依赖引入

org.apache.flink

flink-table-api-java

${flink.version}

org.apache.flink

flink-table-api-java-bridge_${scala.binary.version}

${flink.version}

org.apache.flink

flink-table-planner-blink_${scala.binary.version}

${flink.version}

com.alibaba.ververica

flink-connector-mysql-cdc

1.4.0

mysql

mysql-connector-java

8.0.28

com.alibaba

fastjson

1.2.80

com.fasterxml.jackson.core

jackson-core

${jackson.version}

com.fasterxml.jackson.core

jackson-databind

${jackson.version}

com.fasterxml.jackson.module

<artifactId>jackson-module-parameter-names

${jackson.version}

基于table

package spendreport.cdc;

import com.alibaba.fastjson.JSONObject;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;

import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;

import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;

import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;

import io.debezium.data.Envelope;

import java.util.List;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

import org.apache.kafka.connect.data.Field;

import org.apache.kafka.connect.data.Struct;

import org.apache.kafka.connect.source.SourceRecord;

;

/**

* @author zhengwen

**/

public class TestMySqlFlinkCDC {

public static void main(String[] args) throws Exception {

//1.创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传, 需要从 Checkpoint 或者 Savepoint 启动程序

//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK

env.enableCheckpointing(5000L);

//2.2 指定 CK 的一致性语义

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//2.3 设置任务关闭的时候保留最后一次 CK 数据

env.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//2.4 指定从 CK 自动重启策略

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));

DebeziumSourceFunction sourceFunction = MySQLSource.builder()

.hostname("127.0.0.1")

.serverTimeZone("GMT+8") //时区报错增加这个设置

.port(3306)

.username("root")

.password("123456")

.databaseList("wz")

.tableList("wz.user_info") //注意表一定要写库名.表名这种,多个,隔开

.startupOptions(StartupOptions.initial())

//自定义转json格式化

.deserializer(new MyJsonDebeziumDeserializationSchema())

//自带string格式序列化

//.deserializer(new StringDebeziumDeserializationSchema())

.build();

DataStreamSource streamSource = env.addSource(sourceFunction);

//TODO 可以keyBy,比如根据table或type,然后开窗处理

//3.打印数据

streamSource.print();

//streamSource.addSink(); 输出

//4.执行任务

env.execute("flinkTableCDC");

}

private static class MyJsonDebeziumDeserializationSchema implements

com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema {

@Overrhttp://ide

public void deserialize(SourceRecord sourceRecord, Collector collector)

throws Exception {

Struct value = (Struct) sourceRecord.value();

Struct source = value.getStruct("source");

//获取数据库名称

String db = source.getString("db");

String table = source.getString("table");

//获取数据类型

String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();

if (type.equals("create")) {

type = "insert";

}

JSONObject jsonObject = new JSONObject();

jsonObject.put("database", db);

jsonObject.put("table", table);

jsonObject.put("type", type);

//获取数据data

Struct after = value.getStruct("after");

JSONObject dataJson = new JSONObject();

List fields = after.schema().fields();

for (Field field : fields) {

String field_name = field.name();

Object fieldValue = after.get(field);

dataJson.put(field_name, fieldValue);

}

jsonObject.put("data", dataJson);

collector.collect(JSONObject.toJSONString(jsonObject));

}

@Override

public TypeInformation getProducedType() {

return BasicTypeInfo.STRING_TYPE_INFO;

}

}

}

运行效果

PS:

操作数据库的增删改就会立马触发这里是自定义的序列化转json格式字符串,自带的字符串序列化也是可以的(可以自己试试打印的内容)

基于sql

package spendreport.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**

* @author zhengwen

**/

public class TestMySqlFlinkCDC2 {

public static void main(String[] args) throws Exception {

//1.创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//2.创建 Flink-MySQL-CDC 的 Source

String connectorName = "mysql-cdc";

String dbHostName = "127.0.0.1";

String dbPort = "3306";

String dbUsername = "root";

String dbPassword = "123456";

String dbDatabaseName = "wz";

String dbTableName = "user_info";

String tableSql = "CREATE TABLE t_user_info ("

+ "id int,mobile varchar(20),"

+ "user_name varchar(30),"

+ "real_name varchar(60),"

+ "id_card varchar(20),"

+ "org_name varchar(100),"

+ "user_stars int,"

+ "create_by int,"

// + "create_time datetime,"

+ "update_by int,"

// + "update_time datetime,"

+ "is_deleted int) "

+ " WITH ("

+ " 'connector' = '" + connectorName + "',"

+ " 'hostname' = '" + dbHostName + "',"

+ " 'port' = '" + dbPort + "',"

+ " 'username' = '" + dbUsername + "',"

+ " 'password' = '" + dbPassword + "',"

+ " 'database-name' = '" + dbDatabaseName + "',"

+ " 'table-name' = '" + dbTableName + "'"

+ ")";

tableEnv.executeSql(tableSql);

tableEnv.executeSql("select * from t_user_info").print();

env.execute();

}

}

运行效果:

总结

既然是基于日志,那么数据库的配置文件肯定要开启日志功能,这里mysql需要开启内容

server-id=1log_bin=mysql-binbinlog_format=ROW  #目前还只能支持行expire_logs_days=30binlog_do_db=wz #这里binlog的库如果有多个就再写一行,千万不要写成用,隔开

实时性确实高,比那些自动任务定时取体验号百倍流示的确实丝滑

最后肯定证明这种方式同步数据可行,而且实时性特高,但是就是不知道我们的目标数据库是否可以开启这些日志配置。UP!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SpringBoot超详细讲解集成Flink的部署与打包方法(springboot整合flink)
下一篇:Java实现聊天室界面(java制作聊天界面)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~