资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

RocketMQ如何解决分布式事务

本篇内容主要讲解“RocketMQ如何解决分布式事务”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“RocketMQ如何解决分布式事务”吧!

成都创新互联公司是专业的汉川网站建设公司,汉川接单;提供做网站、网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行汉川网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

一致性如何保证:

RocketMQ解决分布式事务(可靠消息最终一致性方案)

1、A系统发送一个prepared消息到MQ,如果这个prepared消息发送失败那么就直接取消操作别执行了。

2、如果这个消息发送成功了、就接着执行本地事务(executeLocalTransaction),如果成功就告诉MQ发送确认消息,如果失败,就告诉MQ发送回滚消息。

3、如果发送了确认消息、那么B系统会接收到确认消息,然后执行本地事务。

4、上面的第2步, 由于网络原因发送确认or回滚消息失败,但是broker有轮询机制,根据唯一id查询本地事务状态,MQ会自动定时轮询所有prepared消息回调你的接口(checkLocalTransaction),问你,这个消息是不是本地事务处理失败了,所有没有发送确认的消息,是继续重试还是回滚?一版来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。

PS:此方案是不支持事务发起服务进行回滚的,但是大部分互联网应用都不会要求事务发起方进行回滚,如果一定要事务发起方进行回滚应该采用2PC、3PC、TCC等强一致性方案来实现分布式事务,比如LCN。

订单-库存-分布式事务

这里通过一个实例来讲一下RocketMQ实现分布式事务具体编码。

场景: 下单场景,订单服务生成订单,当订单支付成功之后,修改订单状态已支付,并且要通知库存服务进行库存的扣减。

数据库设计:

CREATE TABLE `yzy_order` (
  `id` int(11) NOT NULL,
  `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT '订单id',
  `buy_num` int(11) DEFAULT NULL COMMENT '购买数量',
  `good_id` int(11) DEFAULT NULL COMMENT '商品ID',
  `user_id` int(11) DEFAULT NULL COMMENT '用户ID',
  `pay_status` int(11) DEFAULT NULL COMMENT '支付状态,0:没有支付,1:已经支付',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci




CREATE TABLE `yzy_repo` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名称',
  `num` int(11) NOT NULL DEFAULT '0' COMMENT '库存数量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='测试,库存表表'


开始实战

订单服务service的主要方法

package com.transaction.order;

import com.alibaba.dubbo.config.annotation.Reference;
import com.transaction.repository.IRepositoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Service
public class OrderService {
    @Autowired
    OrderDao orderDao;

    public final int PAY_DONE = 1;

    /**
     *  检查订单是否存在并且状态是支付完成
    **/
    public boolean checkOrderPaySuccess(String orderId){
        List allOrders = orderDao.findAll();
        return  allOrders.stream()
                .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE);
    }

 /**
     *  更新订单是为支付完成
    **/
    public void updatePayStatusByOrderId(String orderId){
        orderDao.updatePayStatusByOrderId(orderId, PAY_DONE);
    }

 /**
     *  生成订单,状态默认是未支付
    **/

    public void save(String orderId, int num, int goodId,int userId) {

        YzyOrder yzyOrder = new YzyOrder();
        yzyOrder.setOrderId(orderId);
        yzyOrder.setBuyNum(num);
        yzyOrder.setGoodId(goodId);
        yzyOrder.setUserId(userId);

        orderDao.save(yzyOrder);
    }
}

业务流程

1.在订单表创建一个状态是未支付的订单

 在终端或者浏览器 执行  curl '127.0.0.1:8081/order/save?num=2&good_id=1&user_id=1001' 

 /**
     * 生成订单接口
     * @param num
     * @param goodId
     * @param userId
     * @return
     */
    @GetMapping("save")
    public String makeOrder(
            @RequestParam("num") int num,
            @RequestParam("good_id") int goodId,
            @RequestParam("user_id") int userId) {

        orderService.save(UUID.randomUUID().toString(), num, goodId,userId);
        return "success";
    }

2.用户支付完成,通过MQ通知库存服务扣减库存

OrderController:pay 发送订单支付成功的MQ事务消息,这里注意体会,并不是直接调用OrderService::updatePayStatusByOrderId 然后发送普通的MQ消息。而是先发送事务消息到MQ,然后MQ回调订单服务的TransactionListener::executeLocalTransaction,在这里完成订单状态的更新,保证发送事务消息和更新订单状态的一致性.

  @GetMapping("pay")
    public String pay(@RequestParam("order_id") String orderId)
            throws UnsupportedEncodingException, MQClientException, JsonProcessingException {
        transactionProducer.sendOrderPaySucessEvent(orderId);
        return "success";
    }

3.订单服务端的事务消息监听器

@Component
public class TransactionProducer implements InitializingBean {

    private TransactionMQProducer producer;

    @Autowired
    private OrderService orderService;

    @Autowired
    private OrderDao orderDao;

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new TransactionMQProducer("order-pay-group");
        producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876");
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        producer.setExecutorService(executor);
        //设置发送消息的回调
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 根据消息发送的结果 判断是否执行本地事务
             *
             * 回调该方法的时候说明 消息已经成功发送到了MQ,可以把订单状态更新为 "支付成功"
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 根据本地事务执行成与否判断 事务消息是否需要commit与 rollback
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                try {
                    OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class);

                    //MQ已经收到了TransactionProducer send方法发送的事务消息,下面执行本地的事务
                    //本地记录订单信息
                    orderService.updatePayStatusByOrderId(record.getOrderId());

                    state = LocalTransactionState.COMMIT_MESSAGE;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                } catch (IOException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return state;
            }
            /**
             * RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                OrderRecord record = null;
                try {
                    record = objectMapper.readValue(msg.getBody(), OrderRecord.class);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    //根据是否有transaction_id对应转账记录 来判断事务是否执行成功
                    boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId());

                    if (isLocalSuccess) {
                        state = LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        state = LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return state;
            }
        });
        producer.start();
    }

    public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException {
        ObjectMapper objectMapper = new ObjectMapper();
        YzyOrder order = orderDao.findAll().stream()
                .filter(item->item.getOrderId().equals(orderId))
                .collect(Collectors.toList()).get(0);
        if(order == null){
            System.out.println("not found order " + orderId);
        }
        // 构造发送的事务 消息
        OrderRecord record = new OrderRecord();
        record.setUserId(order.getUserId());
        record.setOrderId(orderId);
        record.setBuyNum(order.getBuyNum());
        record.setPayStatus(order.getPayStatus());
        record.setGoodId(order.getGoodId());

        Message message = new Message("Order-Success", "", record.getOrderId(),
                objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET));

        TransactionSendResult result = producer.sendMessageInTransaction(message, null);
        System.out.println("发送事务消息 ,orderId = " + record.getOrderId() + " " + result.toString());
    }
}

4.库存服务扣减库存

需要注意的问题:

1.  扣减库存要防止在并发的情况下被扣成负数

2. 先select后update的方式更新库存要加分布式锁或者数据库乐观锁,update语句需要是幂等的

   UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;

3. 注意通过msgId或者orderId来进行消费幂等处理

 @Override
    public int reduce(Integer buyNum, Integer goodId) {

        //并发的情况下,为了防止库存被扣成负数,有三种解决方案
        //1. select for update (必须放到事务中)
        //2. 这段逻辑加上分布式锁
        //3. 数据库加上一个version字段,乐观锁

        while (true){
            Optional repoOption = repositoryDao.findById(goodId);
            if (!repoOption.isPresent()) {
                return 0;
            }

            YzyRepo repo = repoOption.get();

            //避免数据库库存扣减小于零
            if (repo.getNum() - buyNum < 0) {
                return -1;
            }
            repo.setNum(repo.getNum() - buyNum);

            int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId);
            if(affect > 0){
                return affect;
            }
        }
    }

到此,相信大家对“RocketMQ如何解决分布式事务”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


当前标题:RocketMQ如何解决分布式事务
文章路径:http://www.cdkjz.cn/article/jdpgee.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

大客户专线   成都:13518219792   座机:028-86922220