小蔡学Java

项目二总结:(十)运输任务值司机入库出库

2024-03-01 16:24 1020 0 项目 消息消费幂等性分布式事务分布式锁

运输任务

运输任务是针对于车辆的一次运输生成的,每一个运输任务都有对应的司机作业单。 例如:张三发了一个从北京金燕龙营业部发往上海浦东航头营业部的快递,它的转运路线是:金燕龙营业部 → 昌平区分拣中心 → 北京转运中心 → 上海转运中心 → 浦东区分拣中心 → 航头营业部,在此次的转运中一共会产生5个运输任务和至少10个司机作业单(一个车辆至少配备2个司机)。 需要注意的是,一个运输任务中包含了多个运单,就是一辆车拉了一车的快件,是一对多的关系。

表结构

运输任务在work微服务中,主要涉及到2张表,分别是:sl_transport_task(运输任务表)、sl_transport_order_task(运输任务与运单关系表)。司机作业单是存储在司机微服务中的sl_driver_job(司机作业单)表中。

司机作业单

编码实现

监听消息

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = Constants.MQ.Queues.WORK_TRANSPORT_TASK_CREATE),
            exchange = @Exchange(name = Constants.MQ.Exchanges.TRANSPORT_TASK, type = ExchangeTypes.TOPIC),
            key = Constants.MQ.RoutingKeys.TRANSPORT_TASK_CREATE
    ))
    public void listenTransportTaskMsg(String msg) {
        //解析消息 {"driverIds":[123,345], "truckPlanId":456, "truckId":1210114964812075008,"totalVolume":4.2,"endOrganId":90001,"totalWeight":7,"transportOrderIdList":[320733749248,420733749248],"startOrganId":100280}
        JSONObject jsonObject = JSONUtil.parseObj(msg);
        //获取到司机id列表
        JSONArray driverIds = jsonObject.getJSONArray("driverIds");
        // 分配状态
        TransportTaskAssignedStatus assignedStatus = CollUtil.isEmpty(driverIds) ? TransportTaskAssignedStatus.MANUAL_DISTRIBUTED : TransportTaskAssignedStatus.DISTRIBUTED;
        //创建运输任务
        Long transportTaskId = this.createTransportTask(jsonObject, assignedStatus);

        if (CollUtil.isEmpty(driverIds)) {
            log.info("生成司机作业单,司机列表为空,需要手动设置司机作业单 -> msg = {}", msg);
            return;
        }
        for (Object driverId : driverIds) {
            //生成司机作业单
            this.driverJobFeign.createDriverJob(transportTaskId, Convert.toLong(driverId));
        }
    }

创建运输任务

    @Transactional
    public Long createTransportTask(JSONObject jsonObject, TransportTaskAssignedStatus assignedStatus) {
        //根据车辆计划id查询预计发车时间和预计到达时间
        Long truckPlanId = jsonObject.getLong("truckPlanId");
        TruckPlanDto truckPlanDto = truckPlanFeign.findById(truckPlanId);

        //创建运输任务
        TransportTaskEntity transportTaskEntity = new TransportTaskEntity();
        transportTaskEntity.setTruckPlanId(jsonObject.getLong("truckPlanId"));
        transportTaskEntity.setTruckId(jsonObject.getLong("truckId"));
        transportTaskEntity.setStartAgencyId(jsonObject.getLong("startOrganId"));
        transportTaskEntity.setEndAgencyId(jsonObject.getLong("endOrganId"));
        transportTaskEntity.setTransportTripsId(jsonObject.getLong("transportTripsId"));
        transportTaskEntity.setAssignedStatus(assignedStatus); //任务分配状态
        transportTaskEntity.setPlanDepartureTime(truckPlanDto.getPlanDepartureTime()); //计划发车时间
        transportTaskEntity.setPlanArrivalTime(truckPlanDto.getPlanArrivalTime()); //计划到达时间
        transportTaskEntity.setStatus(TransportTaskStatus.PENDING); //设置运输任务状态

        // TODO 完善满载状态
        if (CollUtil.isEmpty(jsonObject.getJSONArray("transportOrderIdList"))) {
            transportTaskEntity.setLoadingStatus(TransportTaskLoadingStatus.EMPTY);
        } else {
            transportTaskEntity.setLoadingStatus(TransportTaskLoadingStatus.FULL);
        }

        //查询路线距离
        TransportLineSearchDTO transportLineSearchDTO = new TransportLineSearchDTO();
        transportLineSearchDTO.setPage(1);
        transportLineSearchDTO.setPageSize(1);
        transportLineSearchDTO.setStartOrganId(transportTaskEntity.getStartAgencyId());
        transportLineSearchDTO.setEndOrganId(transportTaskEntity.getEndAgencyId());
        PageResponse<TransportLineDTO> transportLineResponse = this.transportLineFeign.queryPageList(transportLineSearchDTO);
        TransportLineDTO transportLineDTO = CollUtil.getFirst(transportLineResponse.getItems());
        if (ObjectUtil.isNotEmpty(transportLineDTO)) {
            //设置距离
            transportTaskEntity.setDistance(transportLineDTO.getDistance());
        }

        //保存数据
        this.transportTaskService.save(transportTaskEntity);

        //创建运输任务与运单之间的关系
        this.createTransportOrderTask(transportTaskEntity.getId(), jsonObject);
        return transportTaskEntity.getId();
    }

创建运单关系

    private void createTransportOrderTask(final Long transportTaskId, final JSONObject jsonObject) {
        //创建运输任务与运单之间的关系
        JSONArray transportOrderIdList = jsonObject.getJSONArray("transportOrderIdList");
        if (CollUtil.isEmpty(transportOrderIdList)) {
            return;
        }

        //将运单id列表转成运单实体列表
        List<TransportOrderTaskEntity> resultList = transportOrderIdList.stream()
                .map(o -> {
                    TransportOrderTaskEntity transportOrderTaskEntity = new TransportOrderTaskEntity();
                    transportOrderTaskEntity.setTransportTaskId(transportTaskId);
                    transportOrderTaskEntity.setTransportOrderId(Convert.toStr(o));
                    return transportOrderTaskEntity;
                }).collect(Collectors.toList());

        //批量保存运输任务与运单的关联表
        this.transportOrderTaskService.batchSaveTransportOrder(resultList);

        //批量标记运单为已调度状态
        List<TransportOrderEntity> list = transportOrderIdList.stream()
                .map(o -> {
                    TransportOrderEntity transportOrderEntity = new TransportOrderEntity();
                    transportOrderEntity.setId(Convert.toStr(o));
                    //状态设置为已调度
                    transportOrderEntity.setSchedulingStatus(TransportOrderSchedulingStatus.SCHEDULED);
                    return transportOrderEntity;
                }).collect(Collectors.toList());
        this.transportOrderService.updateBatchById(list);
    }

测试

基于调度中心进行测试,需要sl-express-ms-dispatch-service、sl-express-ms-work-service、sl-express-ms-oms-service服务跑起来进行测试。 可以看到队列已经绑定到交换机:

经过测试发现已经生成了运输任务:

运输任务与运单的关系数据:

生成的司机作业单:

司机作业单对应的是两条数据,每个司机会有对应的一条作业单。

司机入库

司机入库业务是非常核心的业务,司机入库就意味着车辆入库,也就是此次运输结束,需要开始下一个运输、结束此次运输任务、完成司机作业单等操作。 司机入库的流程是在sl-express-ms-driver-service微服务中完成的,基本的逻辑已经实现,现在需要我们实现运单向下一个节点的转运,即:开始新的转运工作。

代码实现

    /**
     * 司机入库,修改运单的当前节点和下个节点 以及 修改运单为待调度状态,结束运输任务
     *
     * @param driverDeliverDTO 司机作业单id
     */
    @Override
    @GlobalTransactional
    public void intoStorage(DriverDeliverDTO driverDeliverDTO) {
        //1.司机作业单,获取运输任务id
        DriverJobEntity driverJob = super.getById(driverDeliverDTO.getId());
        if (ObjectUtil.isEmpty(driverJob)) {
            throw new SLException(DriverExceptionEnum.DRIVER_JOB_NOT_FOUND);
        }
        if (ObjectUtil.notEqual(driverJob.getStatus(), DriverJobStatus.PROCESSING)) {
            throw new SLException(DriverExceptionEnum.DRIVER_JOB_STATUS_UNKNOWN);
        }

        //运输任务id
        Long transportTaskId = driverJob.getTransportTaskId();

        //2.更新运输任务状态为完成
        //加锁,只能有一个司机操作,任务已经完成的话,就不需要进行流程流转,只要完成司机自己的作业单即可
        String lockRedisKey = Constants.LOCKS.DRIVER_JOB_LOCK_PREFIX + transportTaskId;
        //2.1获取锁
        RLock lock = this.redissonClient.getFairLock(lockRedisKey);
        if (lock.tryLock()) {
            //2.2获取到锁
            try {
                //2.3查询运输任务
                TransportTaskDTO transportTask = this.transportTaskFeign.findById(transportTaskId);
                //2.4判断任务是否已结束,不能再修改流转
                if (!ObjectUtil.equalsAny(transportTask.getStatus(), TransportTaskStatus.CANCELLED, TransportTaskStatus.COMPLETED)) {
                    //2.5修改运单流转节点,修改当前节点和下一个节点
                    this.transportOrderFeign.updateByTaskId(String.valueOf(transportTaskId));

                    //2.6结束运输任务
                    TransportTaskCompleteDTO transportTaskCompleteDTO = BeanUtil.toBean(driverDeliverDTO, TransportTaskCompleteDTO.class);
                    transportTaskCompleteDTO.setTransportTaskId(String.valueOf(transportTaskId));
                    this.transportTaskFeign.completeTransportTask(transportTaskCompleteDTO);
                }
            } finally {
                lock.unlock();
            }
        } else {
            throw new SLException(DriverExceptionEnum.DRIVER_JOB_INTO_STORAGE_ERROR);
        }

        //3.修改所有与运输任务id相关联的司机作业单状态和实际到达时间
        LambdaUpdateWrapper<DriverJobEntity> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper.eq(ObjectUtil.isNotEmpty(transportTaskId), DriverJobEntity::getTransportTaskId, transportTaskId)
                .set(DriverJobEntity::getStatus, DriverJobStatus.DELIVERED)
                .set(DriverJobEntity::getActualArrivalTime, LocalDateTime.now());
        this.update(updateWrapper);
    }

运单流转

实现的关键点:

● 设置当前所在网点id为下一个网点id(司机入库,说明已经到达目的地)

● 解析完整运输链路,找出下一个转运节点,需要考虑到拒收、最后一个节点等情况

● 发送消息通知,参与新的调度或生成快递员的取派件任务

● 发送物流信息的消息(先TODO)

    @Override
    public boolean updateByTaskId(Long taskId) {
        //通过运输任务查询运单id列表
        List<String> transportOrderIdList = this.transportTaskService.queryTransportOrderIdListById(taskId);
        if (CollUtil.isEmpty(transportOrderIdList)) {
            return false;
        }
        //查询运单列表
        List<TransportOrderEntity> transportOrderList = super.listByIds(transportOrderIdList);
        for (TransportOrderEntity transportOrder : transportOrderList) {
            // TODO 发送物流信息
	
            //设置当前所在机构id为下一个机构id
            transportOrder.setCurrentAgencyId(transportOrder.getNextAgencyId());
            //解析完整的运输链路,找到下一个机构id
            String transportLine = transportOrder.getTransportLine();
            JSONObject jsonObject = JSONUtil.parseObj(transportLine);
            Long nextAgencyId = 0L;
            JSONArray nodeList = jsonObject.getJSONArray("nodeList");
            //这里反向循环主要是考虑到拒收的情况,路线中会存在相同的节点,始终可以查找到后面的节点
            //正常:A B C D E ,拒收:A B C D E D C B A
            for (int i = nodeList.size() - 1; i >= 0; i--) {
                JSONObject node = (JSONObject) nodeList.get(i);
                Long agencyId = node.getLong("bid");
                if (ObjectUtil.equal(agencyId, transportOrder.getCurrentAgencyId())) {
                    if (i == nodeList.size() - 1) {
                        //已经是最后一个节点了,也就是到最后一个机构了
                        nextAgencyId = agencyId;
                        transportOrder.setStatus(TransportOrderStatus.ARRIVED_END);
                        //发送消息更新状态
                        this.sendUpdateStatusMsg(ListUtil.toList(transportOrder.getId()), TransportOrderStatus.ARRIVED_END);
                    } else {
                        //后面还有节点
                        nextAgencyId = ((JSONObject) nodeList.get(i + 1)).getLong("bid");
                        //设置运单状态为待调度
                        transportOrder.setSchedulingStatus(TransportOrderSchedulingStatus.TO_BE_SCHEDULED);
                    }
                    break;
                }
            }
            //设置下一个节点id
            transportOrder.setNextAgencyId(nextAgencyId);

            //如果运单没有到达终点,需要发送消息到运单调度的交换机中
            //如果已经到达最终网点,需要发送消息,进行分配快递员作业
            if (ObjectUtil.notEqual(transportOrder.getStatus(), TransportOrderStatus.ARRIVED_END)) {
                this.sendTransportOrderMsgToDispatch(transportOrder);
            } else {
                //发送消息生成派件任务
                this.sendDispatchTaskMsgToDispatch(transportOrder);
            }
        }
        //批量更新运单
        return super.updateBatchById(transportOrderList);
    }

测试 编写测试用例:

 	@Test
    void updateByTaskId() {
        //设置运输任务id
        this.transportOrderService.updateByTaskId(1568165717632933889L);
    }

测试之前,观察当前机构、下个机构:

测试之后,发现调度状态、当前机构、下个机构都已经更新,并且会发送消息再次进行调度:

消息内容(sl.queue.dispatch.mergeTransportOrder):

模拟面试

● 能说一下xxl-job的分片式调度是在什么场景下使用的吗?这样做的好处是什么?

● 不同的运单流转节点是不一样的,你们如何将运单合并?如何确保redis的高可用?

● 你们系统中,车辆、车次和路线之间是什么关系?车辆有司机数量限制吗?

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录