小蔡学Java

项目二总结:(八)调度任务及运单合并去重

2024-02-25 15:57 1304 0 项目 Redis数据结构消息消费幂等性RabbitMQ

智能调度

在物流项目中,采用智能调度的方式对车辆任务、快递员的取派件任务进行调度管理,这样更加有效的进行管理,降低企业运营成本。

为什么需要调度

可能你会这样的疑问,用户下单了,快递员上门取件,取件后送回网点,网点有车辆运走,再经过车辆的一系列的运输,最后进行派件,对方就能收到快件,不就是这么简单的流程吗?为什么需要调度? 没错,看起来是简单的流程,实际上通过仔细的分析就会发现这个过程并不简单,甚至会非常的复杂,比如:

● 用户下单后,应该哪个网点的快递员上门呢?

○ 这样就需要通过用户的发件人地址信息定位到所属服务范围内的网点进行服务

○ 紧接着又会有一个问题,确定了网点后,这个网点有多个快递员,这个取件任务应该是指派谁呢?

○ 这里就需要对快递员的工作情况以及排班情况进行分析,才能确定哪个快递员进行服务。

● 快递员把快件拿回到网点后,假设这个快件是从上海寄往北京的,是网点直接开车送到北京吗?

○ 显然不是的,直接送的话成本太高了,怎么样成本最低呢?显然是车辆尽可能的满载,集中化运输(这个车上装的都是从A点→B点的快件,这里的A和B可能代表的网点或转运中心,而非全路线)

○ 一般物流公司会有很多的车辆、路线、司机,而每个路线都会设置不同的车次,如何能够将快件合理的分配到车辆上,分配时需要参考车辆的载重、司机的排班,车辆的状态以及车次等信息

● 快件到收件人地址所在服务范围内的网点了,系统该如何分配快递员?

● 还有一些其他的情况,比如:快件拒收应该怎么处理?车辆故障不能使用怎么处理?一车多个司机,运输任务是如何划分?等等

● 基于以上的问题分析,这就需要系统进行计算处理,这就是我们所说的【智能调度系统】,就是让整个物流流程中的参与者都通过系统的计算,可以井然有序的工作。

整体核心业务流程

关键流程说明: ● 用户下单后,会产生取件任务,该任务也是由调度中心进行调度的

● 订单转运单后,会发送消息到调度中心,在调度中心中对相同节点的运单进行合并(这里是指最小转运单元)

● 调度中心同样也会对派件任务进行调度,用于生成快递员的派件任务

● 司机的出库和入库操作也是流程中的核心动作,尤其是入库操作,是推动运单流转的关键

订单转运单

快递员上门取件成功后,会将订单转成运单,后续将进行一系列的转运流程。

运单表结构

揽收成功的消息

订单转运单的业务的触发点在于快递员的揽收成功,这个通过是通过消息传递的,之所以通过消息是为了减少系统间的耦合度。

消费消息

    /**
     * 快递员取件成功
     *
     * @param msg 消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = Constants.MQ.Queues.WORK_COURIER_PICKUP_SUCCESS),
            exchange = @Exchange(name = Constants.MQ.Exchanges.COURIER, type = ExchangeTypes.TOPIC),
            key = Constants.MQ.RoutingKeys.COURIER_PICKUP
    ))
    public void listenCourierPickupMsg(String msg) {
        log.info("接收到快递员取件成功的消息 >>> msg = {}", msg);
        //解析消息
        CourierMsg courierMsg = JSONUtil.toBean(msg, CourierMsg.class);

        //订单转运单
        this.transportOrderService.orderToTransportOrder(courierMsg.getOrderId());

        //TODO 发送运单跟踪消息
    }

生成运单号

对于运单号的生成有特殊的要求,格式:SL+13位数字,例如:SL1000000000760,对于这个需求,如果采用MP提供的雪花id生成是19位,是不能满足需求的,所以我们需要自己生成id,并且要确保唯一不能重复。 在这里我们采用美团的Leaf作为id生成服务,其源码托管于GitHub,美团技术开发文档如下:

Leaf——美团点评分布式ID生成系统

关于这个ID的生成和具体的细节可以看我之前的文章: 美团的分布式ID方案leaf 号段模式 snowflake算法

想了解我对雪花算法的一点拙见可以看:SnowFlake雪花算法

重要接口

关于运单表的一些增删改查我这里不在赘述,下面展示几个核心的接口

更新状态

在更新运单状态时需要考虑三件事:

● 如果更新运单为拒收状态,需要将快递退回去,也就是原路返回

● 在更新状态时,需要同步更新物流信息,通过发送消息的方式完成(先TODO,后面实现)

● 更新状态后需要通知其他系统(消息通知)

    @Override
    public boolean updateStatus(List<String> ids, TransportOrderStatus transportOrderStatus) {
        if (CollUtil.isEmpty(ids)) {
            return false;
        }

        if (TransportOrderStatus.CREATED == transportOrderStatus) {
            //修改订单状态不能为 新建 状态
            throw new SLException(WorkExceptionEnum.TRANSPORT_ORDER_STATUS_NOT_CREATED);
        }

        List<TransportOrderEntity> transportOrderList;
        //判断是否为拒收状态,如果是拒收需要重新查询路线,将包裹逆向回去
        if (TransportOrderStatus.REJECTED == transportOrderStatus) {
            //查询运单列表
            transportOrderList = super.listByIds(ids);
            for (TransportOrderEntity transportOrderEntity : transportOrderList) {
                //设置为拒收运单
                transportOrderEntity.setIsRejection(true);
                //根据起始机构规划运输路线,这里要将起点和终点互换
                Long sendAgentId = transportOrderEntity.getEndAgencyId();//起始网点id
                Long receiveAgentId = transportOrderEntity.getStartAgencyId();//终点网点id

                //默认参与调度
                boolean isDispatch = true;
                if (ObjectUtil.equal(sendAgentId, receiveAgentId)) {
                    //相同节点,无需调度,直接生成派件任务
                    isDispatch = false;
                } else {
                    TransportLineNodeDTO transportLineNodeDTO = this.transportLineFeign.queryPathByDispatchMethod(sendAgentId, receiveAgentId);
                    if (ObjectUtil.hasEmpty(transportLineNodeDTO, transportLineNodeDTO.getNodeList())) {
                        throw new SLException(WorkExceptionEnum.TRANSPORT_LINE_NOT_FOUND);
                    }

                    //删除掉第一个机构,逆向回去的第一个节点就是当前所在节点
                    transportLineNodeDTO.getNodeList().remove(0);
                    transportOrderEntity.setSchedulingStatus(TransportOrderSchedulingStatus.TO_BE_SCHEDULED);//调度状态:待调度
                    transportOrderEntity.setCurrentAgencyId(sendAgentId);//当前所在机构id
                    transportOrderEntity.setNextAgencyId(transportLineNodeDTO.getNodeList().get(0).getId());//下一个机构id

                    //获取到原有节点信息
                    TransportLineNodeDTO transportLineNode = JSONUtil.toBean(transportOrderEntity.getTransportLine(), TransportLineNodeDTO.class);
                    //将逆向节点追加到节点列表中
                    transportLineNode.getNodeList().addAll(transportLineNodeDTO.getNodeList());
                    //合并成本
                    transportLineNode.setCost(NumberUtil.add(transportLineNode.getCost(), transportLineNodeDTO.getCost()));
                    transportOrderEntity.setTransportLine(JSONUtil.toJsonStr(transportLineNode));//完整的运输路线
                }
                transportOrderEntity.setStatus(TransportOrderStatus.REJECTED);

                if (isDispatch) {
                    //发送消息参与调度
                    this.sendTransportOrderMsgToDispatch(transportOrderEntity);
                } else {
                    //不需要调度,发送消息生成派件任务
                    transportOrderEntity.setStatus(TransportOrderStatus.ARRIVED_END);
                    this.sendDispatchTaskMsgToDispatch(transportOrderEntity);
                }
            }
        } else {
            //根据id列表封装成运单对象列表
            transportOrderList = ids.stream().map(id -> {
				// TODO 发送运单跟踪消息
                //封装运单对象
                TransportOrderEntity transportOrderEntity = new TransportOrderEntity();
                transportOrderEntity.setId(id);
                transportOrderEntity.setStatus(transportOrderStatus);
                return transportOrderEntity;
            }).collect(Collectors.toList());
        }

        //批量更新数据
        boolean result = super.updateBatchById(transportOrderList);

        //发消息通知其他系统运单状态的变化
        this.sendUpdateStatusMsg(ids, transportOrderStatus);

        return result;
    }

拒收退回的物流信息:

合并运单

运单在运输过程中,虽然快件的起点与终点都不一定相同,但是在中间转运环节有一些运输节点是相同的,如下:

可以看出,A→E与A→G的运单,在A→B和B→C的转运是相同的,所以在做任务调度时,首先要做的事情就是将相同转运的运单进行合并,以供后续调度中心进行调度。

合并之后的结果存储在哪里呢?我们可以想一下,后续处理的需求:

  • 先进行合并处理的运单按照顺序进行调度
  • 此次运单调度处理完成后就应该将其删除掉,表示已经处理完成

根据以上两点的需求,很容易想到队列的存储方式,先进先出,实现队列的技术方案有很多,在这里我们采用Redis的List作为队列将相同节点转运的订单放到同一个队列中,可以使用其LPUSH放进去,RPOP弹出数据,这样就可以确保先进先出,并且弹出后数据将删除,因此符合我们的需求。

代码实现

实现中,需要考虑消息的幂等性,防止重复数据的产生。

/**
 * 对于待调度运单消息的处理
 */
@Slf4j
@Component
public class TransportOrderDispatchMQListener {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = Constants.MQ.Queues.DISPATCH_MERGE_TRANSPORT_ORDER),
            exchange = @Exchange(name = Constants.MQ.Exchanges.TRANSPORT_ORDER_DELAYED, type = ExchangeTypes.TOPIC, delayed = Constants.MQ.DELAYED),
            key = Constants.MQ.RoutingKeys.JOIN_DISPATCH
    ))
    public void listenDispatchMsg(String msg) {
        // {"transportOrderId":"SL1000000000560","currentAgencyId":100280,"nextAgencyId":90001,"totalWeight":3.5,"totalVolume":2.1,"created":1652337676330}
        log.info("接收到新运单的消息 >>> msg = {}", msg);
        DispatchMsgDTO dispatchMsgDTO = JSONUtil.toBean(msg, DispatchMsgDTO.class);
        if (ObjectUtil.isEmpty(dispatchMsgDTO)) {
            return;
        }

        Long startId = dispatchMsgDTO.getCurrentAgencyId();
        Long endId = dispatchMsgDTO.getNextAgencyId();
        String transportOrderId = dispatchMsgDTO.getTransportOrderId();

        //消息幂等性处理,将相同起始节点的运单存放到set结构的redis中,在相应的运单处理完成后将其删除掉
        String setRedisKey = this.getSetRedisKey(startId, endId);
        if (this.stringRedisTemplate.opsForSet().isMember(setRedisKey, transportOrderId)) {
            //重复消息
            return;
        }

        //存储数据到redis,采用list结构,从左边写入数据,读取数据时从右边读取
        //key =>  DISPATCH_LIST_CurrentAgencyId_NextAgencyId
        //value =>  {"transportOrderId":111222, "totalVolume":0.8, "totalWeight":2.1, "created":111222223333}

        String listRedisKey = this.getListRedisKey(startId, endId);
        String value = JSONUtil.toJsonStr(MapUtil.builder()
                .put("transportOrderId", transportOrderId)
                .put("totalVolume", dispatchMsgDTO.getTotalVolume())
                .put("totalWeight", dispatchMsgDTO.getTotalWeight())
                .put("created", dispatchMsgDTO.getCreated()).build()
        );

        //存储到redis
        this.stringRedisTemplate.opsForList().leftPush(listRedisKey, value);
        this.stringRedisTemplate.opsForSet().add(setRedisKey, transportOrderId);
    }

    public String getListRedisKey(Long startId, Long endId) {
        return StrUtil.format("DISPATCH_LIST_{}_{}", startId, endId);
    }

    public String getSetRedisKey(Long startId, Long endId) {
        return StrUtil.format("DISPATCH_SET_{}_{}", startId, endId);
    }

}

测试

将DispatchApplication启动后,观察RabbitMQ服务,发现mergeTransportOrder队列已经绑定到transportOrder.delayed交换机

测试方法:

在work微服务中的测试用例进行订单转运单的操作,让其发出消息,在dispatch微服务中进行断点跟踪,最终数据存储到了redis:

模拟面试

● 物流项目中你参与了核心的功能开发吗?能说一下核心的业务逻辑吗?

● 你们的运单号是怎么生成的?如何确保性能?

● 能说一下订单转运单的业务逻辑吗?生成运单后如何与调度中心整合在一起的?

● 合并运单为什么使用Redis的List作为队列?如何确保消息的幂等性的?

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录