任务调度
分析
通过前面的实现,已经将相同转运节点的写入到了Redis的队列中,谁来处理呢?这就需要调度任务进行处理了,基本的思路是:
查询待分配任务的车辆 -> 计算运力 -> 分配运单 -> 生成运输任务 -> 生成司机作业单
也就是说,调度是站在车辆角度推进的。
处理具体的处理业务流程如下:
车辆计划表结构
车辆调度流程
实现
这里采用的是xxl-job的分片式任务调度,主要目的是为了并行多处理车辆,提升调度处理效率。
调度入口
@Resource
private TransportOrderDispatchMQListener transportOrderDispatchMQListener;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private TruckPlanFeign truckPlanFeign;
@Resource
private MQFeign mqFeign;
@Value("${sl.volume.ratio:0.95}")
private Double volumeRatio;
@Value("${sl.weight.ratio:0.95}")
private Double weightRatio;
/**
* 分片广播方式处理运单,生成运输任务
*/
@XxlJob("transportTask")
public void transportTask() {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
//根据分片参数 2小时内并且可用状态车辆
// List<TruckPlanDto> truckDtoList = this.queryTruckPlanDtoList(shardIndex, shardTotal);
List<TruckPlanDto> truckDtoList = this.truckPlanFeign.pullUnassignedPlan(shardTotal, shardIndex);
if (CollUtil.isEmpty(truckDtoList)) {
return;
}
// 对每一个车辆都进行处理
for (TruckPlanDto truckPlanDto : truckDtoList) {
//校验车辆计划对象
if (ObjectUtil.hasEmpty(truckPlanDto.getStartOrganId(), truckPlanDto.getEndOrganId(),
truckPlanDto.getTransportTripsId(), truckPlanDto.getId())) {
log.error("车辆计划对象数据不符合要求, truckPlanDto -> {}", truckPlanDto);
continue;
}
//根据该车辆的开始、结束机构id,来确定要处理的运单数据集合
Long startOrganId = truckPlanDto.getStartOrganId();
Long endOrganId = truckPlanDto.getEndOrganId();
String redisKey = this.transportOrderDispatchMQListener.getListRedisKey(startOrganId, endOrganId);
List<DispatchMsgDTO> dispatchMsgDTOList = new ArrayList<>();
//计算车辆运力、合并运单
this.executeTransportTask(redisKey, truckPlanDto.getTruckDto(), dispatchMsgDTOList);
//生成运输任务
this.createTransportTask(truckPlanDto, startOrganId, endOrganId, dispatchMsgDTOList);
}
//发送消息通过车辆已经完成调度
this.completeTruckPlan(truckDtoList);
}
运单处理
递归处理运单,需要考虑到车辆的运力:
private void executeTransportTask(String redisKey, TruckDto truckDto, List<DispatchMsgDTO> dispatchMsgDTOList) {
String redisData = this.stringRedisTemplate.opsForList().rightPop(redisKey);
if (StrUtil.isEmpty(redisData)) {
//该车辆没有运单需要运输
return;
}
DispatchMsgDTO dispatchMsgDTO = JSONUtil.toBean(redisData, DispatchMsgDTO.class);
//计算该车辆已经分配的运单,是否超出其运力,载重 或 体积超出,需要将新拿到的运单加进去后进行比较
BigDecimal totalWeight = NumberUtil.add(NumberUtil.toBigDecimal(dispatchMsgDTOList.stream()
.mapToDouble(DispatchMsgDTO::getTotalWeight)
.sum()), dispatchMsgDTO.getTotalWeight());
BigDecimal totalVolume = NumberUtil.add(NumberUtil.toBigDecimal(dispatchMsgDTOList.stream()
.mapToDouble(DispatchMsgDTO::getTotalVolume)
.sum()), dispatchMsgDTO.getTotalVolume());
//车辆最大的容积和载重要留有余量,否则可能会超重 或 装不下
BigDecimal maxAllowableLoad = NumberUtil.mul(truckDto.getAllowableLoad(), weightRatio);
BigDecimal maxAllowableVolume = NumberUtil.mul(truckDto.getAllowableVolume(), volumeRatio);
if (NumberUtil.isGreaterOrEqual(totalWeight, maxAllowableLoad)
|| NumberUtil.isGreaterOrEqual(totalVolume, maxAllowableVolume)) {
//超出车辆运力,需要取货的运单再放回去,放到最右边,以便保证运单处理的顺序
this.stringRedisTemplate.opsForList().rightPush(redisKey, redisData);
return;
}
//没有超出运力,将该运单加入到集合中
dispatchMsgDTOList.add(dispatchMsgDTO);
//递归处理运单
executeTransportTask(redisKey, truckDto, dispatchMsgDTOList);
}
消息通知生成运输任务
private void createTransportTask(TruckPlanDto truckPlanDto, Long startOrganId, Long endOrganId, List<DispatchMsgDTO> dispatchMsgDTOList) {
//将运单合并的结果以消息的方式发送出去
//key-> 车辆id,value -> 运单id列表
//{"driverId":123, "truckPlanId":456, "truckId":1210114964812075008,"totalVolume":4.2,"endOrganId":90001,"totalWeight":7,"transportOrderIdList":[320733749248,420733749248],"startOrganId":100280}
List<String> transportOrderIdList = CollUtil.getFieldValues(dispatchMsgDTOList, "transportOrderId", String.class);
//司机列表确保不为null
List<Long> driverIds = CollUtil.isNotEmpty(truckPlanDto.getDriverIds()) ? truckPlanDto.getDriverIds() : ListUtil.empty();
Map<String, Object> msgResult = MapUtil.<String, Object>builder()
.put("truckId", truckPlanDto.getTruckId()) //车辆id
.put("driverIds", driverIds) //司机id
.put("truckPlanId", truckPlanDto.getId()) //车辆计划id
.put("transportTripsId", truckPlanDto.getTransportTripsId()) //车次id
.put("startOrganId", startOrganId) //开始机构id
.put("endOrganId", endOrganId) //结束机构id
//运单id列表
.put("transportOrderIdList", transportOrderIdList)
//总重量
.put("totalWeight", dispatchMsgDTOList.stream()
.mapToDouble(DispatchMsgDTO::getTotalWeight)
.sum())
//总体积
.put("totalVolume", dispatchMsgDTOList.stream()
.mapToDouble(DispatchMsgDTO::getTotalVolume)
.sum())
.build();
//发送消息
String jsonMsg = JSONUtil.toJsonStr(msgResult);
this.mqFeign.sendMsg(Constants.MQ.Exchanges.TRANSPORT_TASK,
Constants.MQ.RoutingKeys.TRANSPORT_TASK_CREATE, jsonMsg);
if (CollUtil.isNotEmpty(transportOrderIdList)) {
//删除redis中set存储的运单数据
String setRedisKey = this.transportOrderDispatchMQListener.getSetRedisKey(startOrganId, endOrganId);
this.stringRedisTemplate.opsForSet().remove(setRedisKey, transportOrderIdList.toArray());
}
}
消息通知完成车辆计划
private void completeTruckPlan(List<TruckPlanDto> truckDtoList) {
//{"ids":[1,2,3], "created":123456}
Map<String, Object> msg = MapUtil.<String, Object>builder()
.put("ids", CollUtil.getFieldValues(truckDtoList, "id", Long.class))
.put("created", System.currentTimeMillis()).build();
String jsonMsg = JSONUtil.toJsonStr(msg);
//发送消息
this.mqFeign.sendMsg(Constants.MQ.Exchanges.TRUCK_PLAN,
Constants.MQ.RoutingKeys.TRUCK_PLAN_COMPLETE, jsonMsg);
}
xxl-job任务
创建任务,任务的分发方式为分片式调度(每5分钟执行一次):
点击执行一次,进行测试看是否会触发代码的执行:
完整测试
现在就可以查询到车次数据,通过xxl-job的调度进行测试。 车辆计划数据的状态要为1,调度状态为0,才能获取到车次数据。
Redis中存在队列数据:
获取到数据:
调度执行后,redis数据被处理掉了,车辆的计划调度状态也改为了【已调度】状态:
另外,生成运输任务的消息已经发出了,只是我们还没有监听处理消息,在后面我们将实现对该消息的处理,就可以生成运输任务了。
评论( 0 )