springboot整合rocketmq实现分布式事务

网友投稿 417 2022-10-22


springboot整合rocketmq实现分布式事务

1 执行流程

(1) 发送方向 MQ 服务端发送消息。

(2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

(3) 发送方开始执行本地事务逻辑。

(4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。

(5) 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。

(6) 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

(7) 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

2 工程

2.1 pom

org.springframework.boot

spring-boot-starter-parent

2.3.0.RELEASE

1.8

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

test

org.projectlombok

lombok

com.alibaba

fastjson

1.2.71

org.apache.commons

commons-collections4

4.2

org.apache.commons

commons-lang3

org.springframework.boot

spring-boot-starter-logging

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.1

org.apache.rocketmq

rocketmq-client

4.3.2

org.springframework.boot

spring-boot-maven-plugin

2.3.0.RELEASE

org.apache.maven.plugins

maven-compiler-plugin

3.8.1

1.8

1.8

2.2 application.yml

rocketmq:

name-server: 192.168.38.50:9876

producer:

group: transcation-group

2.3 TransactionListenerImpl

@RocketMQTransactionListener(txProducerGroup = "transaction-producer-group")

@Slf4j

public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

private static Map STATE_MAP = new HashMap<>();

/**

* 执行业务逻辑

*/

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

try {

System.out.println("用户A账户减500元.");

System.out.println("用户B账户加500元.");

STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);

return RocketMQLocalTransactionState.COMMIT;

} catch (Exception e) {

e.printStackTrace();

}

STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);

return RocketMQLocalTransactionState.UNKNOWN;

}

/**

* 回查

*/

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTIONhttp://_ID);

log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId));

return STATE_MAP.get(transId);

}

}

2.4 SpringTransactionProducer

@Component

@Slf4j

publicQpTss class SpringTransactionProducer {

@Autowired

private RocketMQTemplate rocketMQTemplate;

/**

* 发送消息

*

*/

public void sendMsg(String topic, String msg) {

Message message = MessageBuilder.withPayload(msg).build();

this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null);

log.info("发送成功");

}

}

2.5 SpringTxConsumer

@Component

@RocketMQMessageListener(topic = "pay_topic",

consumerGroup = "transaction-consumer-group",

selectorExpression = "*")

@Slf4j

public class SpringTxConsumehttp://r implements RocketMQListener {

@Override

public void onMessage(String msg) {

log.info("接收到消息 -> {}", msg);

}

}

2.6 ProducerController

@RestController

@RequestMapping("/producer")

public class ProducerController {

@Autowired

private SpringTransactionProducer springTransactionProducer;

@GetMapping("/sendMsg")

public String sendMsg() {

springTransactionProducer.sendMsg("pay_topic", "用户A账户减500元,用户B账户加500元。");

return "发送成功";

}

}

2.7 RocketApplication

@SpringBootApplication

public class RocketApplication {

public static void main(String[] args) {

SpringApplication.run(RocketApplication.class);

}

}

3 测试

3.1 正常消费测试

描述: 正常启动及可。

3.2 回查代码测试

描述: 执行本地事务时添加异常,重启测试,发现消费者没有收到消息。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:17.4.3 使用MulticastSocket实现多点广播(4)
下一篇:17.4.3 使用MulticastSocket实现多点广播(5)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~