RocketMQ Canal 相关笔记

网友投稿 523 2022-11-06


RocketMQ Canal 相关笔记

RocketMQ安装启动

解压后直接进bin启动

nohup sh mqnamesrv & tail

broker配置conf

启动broker

nohup sh bin/mqbroker -n 192.168.1.73:9876 -c conf/broker.conf autoCreateTopicEnable=true &tail

查看

RocketMQ可视化界面

下载地址

​​clean package -Dmaven.test.skip=true

启动

java -jar rocketmq-console-ng-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=192.168.171.129:9876

Canal 安装部署

官方文档非常好用:​​​ topic

出现,RocketMQ 消息队列大量重复消息需配置 mysql-binlog

instance.properties中的配置

贴一下 canal.properties

在原配置文件追加配置

# ...# 可选项: tcp(默认), kafka, RocketMQcanal.serverMode = RocketMQ# ...# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 192.168.1.80:9876canal.mq.retries = 0# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576# flatMessage模式下请将该值改大, 建议50-200canal.mq.lingerMs = 1canal.mq.bufferMemory = 33554432# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canal.mq.canalBatchSize = 50# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时canal.mq.canalGetTimeout = 100# 是否为flat json格式对象canal.mq.flatMessage = truecanal.mq.compressionType = nonecanal.mq.acks = all# kafka消息投递是否使用事务canal.mq.transaction = false

instance.properties

# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address= 192.168.1.12:3306canal.instance.master.journal.name=mysql-bin.000005canal.instance.master.position= 1081canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal/#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=rootcanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config################################################### mysql serverId , v1.0.26+ will autoGen# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address= 192.168.1.12:3306canal.instance.master.journal.name=mysql-bin.000005canal.instance.master.position= 1081canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal/#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=rootcanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config#canal.mq.topic=test-wy# dynamic topic route by schema or table regexcanal.mq.dynamicTopic=spark

可能用到的其他命令

show global variables like "%binlog_format%";

消费者(后续慢慢补充)

消费者使用内网内网内网!!!!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:上海公积金查询API(上海公积金查询app有哪些)
下一篇:判断是否是数独
相关文章

 发表评论

暂时没有评论,来抢沙发吧~