为什么需要调度?
可能你会这样的疑问,用户下单了,快递员上门取件,取件后送回网点,网点有车辆运走,再经过车辆的一系列的运输,最后进行派件,对方就能收到快件,不就是这么简单的流程吗?为什么需要调度?
没错,看起来是简单的流程,实际上通过仔细的分析就会发现这个过程并不简单,甚至会非常的复杂,比如:
● 用户下单后,应该哪个网点的快递员上门呢?
○ 这样就需要通过用户的发件人地址信息定位到所属服务范围内的网点进行服务
○ 紧接着又会有一个问题,确定了网点后,这个网点有多个快递员,这个取件任务应该是指派谁呢?
○ 这里就需要对快递员的工作情况以及排班情况进行分析,才能确定哪个快递员进行服务。
● 快递员把快件拿回到网点后,假设这个快件是从上海寄往北京的,是网点直接开车送到北京吗?
○ 显然不是的,直接送的话成本太高了,怎么样成本最低呢?显然是车辆尽可能的满载,集中化运输(这个车上装的都是从A点→B点的快件,这里的A和B可能代表的网点或转运中心,而非全路线)
○ 一般物流公司会有很多的车辆、路线、司机,而每个路线都会设置不同的车次,如何能够将快件合理的分配到车辆上,分配时需要参考车辆的载重、司机的排班,车辆的状态以及车次等信息
● 快件到收件人地址所在服务范围内的网点了,系统该如何分配快递员?
● 还有一些其他的情况,比如:快件拒收应该怎么处理?车辆故障不能使用怎么处理?一车多个司机,运输任务是如何划分?等等
● 基于以上的问题分析,这就需要系统进行计算处理,这就是我们所说的【智能调度系统】,就是让整个物流流程中的参与者都通过系统的计算,可以井然有序的工作。
整体核心业务流程
关键流程说明:
● 用户下单后,会产生取件任务,该任务也是由调度中心进行调度的
● 订单转运单后,会发送消息到调度中心,在调度中心中对相同节点的运单进行合并(这里是指最小转运单元)
● 调度中心同样也会对派件任务进行调度,用于生成快递员的派件任务
● 司机的出库和入库操作也是流程中的核心动作,尤其是入库操作,是推动运单流转的关键
取派件需求分析
快递员在登录到APP后,可以查看取派件任务列表:
表结构
对于快递员的取件和派件动作,除了类型不同外其他的属性基本都是一样的,所以我们可以将存储在一张表中。
取派件任务存储在sl_work数据库中
取件任务流程
用户在下单后,订单微服务会发消息出来,消息会在dispatch微服务中进行调度计算,最终会向work微服务发送消息,用于生成快递员的取件任务。
快递员取消取件任务的原因为【因个人无法取件,退回到网点】时,需要重新生成取件任务:
派件任务流程
派件任务会在两个场景下生成:
● 场景一,司机入库时,运单流转到最后一个节点,需要快递员派件
● 场景二,发件人与收件人的服务网点是同一个网点时,无需转运,直接生成派件任务
场景一:
场景二:
代码实现
@Override
@Transactional
public PickupDispatchTaskEntity saveTaskPickupDispatch(PickupDispatchTaskEntity taskPickupDispatch) {
// 设置任务状态为新任务
taskPickupDispatch.setStatus(PickupDispatchTaskStatus.NEW);
boolean result = super.save(taskPickupDispatch);
if (result) {
//同步快递员任务到es
this.syncCourierTask2Es(taskPickupDispatch);
//生成运单跟踪消息和快递员端取件/派件消息通知
this.generateMsg(taskPickupDispatch);
return taskPickupDispatch;
}
throw new SLException(WorkExceptionEnum.PICKUP_DISPATCH_TASK_SAVE_ERROR);
}
至于其他的接口都是对取件任务这张表的增删改查,这里便不在一一赘述
挑选几个比较有意义的点
改派快递员
场景:本来属于A快递员的取件任务,由于某种原因,A快递员不能执行,此时A快递员可以改派给其他快递员,会用到此接口。
另外,派件不支持直接改派,需要客服在后台操作。
@Override
public Boolean updateCourierId(List<Long> ids, Long originalCourierId, Long targetCourierId) {
// 1. 校验非空
if (ObjectUtil.hasEmpty(ids, targetCourierId, originalCourierId)) {
throw new SLException(WorkExceptionEnum.UPDATE_COURIER_PARAM_ERROR);
}
// 2. 校验 目标快递员 和 原快递员不可以是同一个人
if (ObjectUtil.equal(originalCourierId, targetCourierId)) {
throw new SLException(WorkExceptionEnum.UPDATE_COURIER_EQUAL_PARAM_ERROR);
}
// 3. 批量更新
LambdaUpdateWrapper<PickupDispatchTaskEntity> updateWrapper = Wrappers.<PickupDispatchTaskEntity>lambdaUpdate()
.in(PickupDispatchTaskEntity::getId, ids)
.set(PickupDispatchTaskEntity::getCourierId, targetCourierId)
.set(PickupDispatchTaskEntity::getAssignedStatus, PickupDispatchTaskAssignedStatus.DISTRIBUTED);
return super.update(updateWrapper);
}
更新取派件状态
实现更新取派件任务状态功能时,需要考虑如下几点:
● 更新的状态不能为【新任务】状态
● 更新状态为【已完成】并且任务类型为派件任务时,必须设置签收状态和签收人
● 更新状态为【已取消】,是取件任务的操作,根据不同的原因有不同的处理逻辑
○ 【因个人无法取件,退回到网点】,需要发送消息重新生成取件任务
○ 【用户取消投递】,需要取消订单
○ 其他原因(用户恶意下单、禁用品、重复下单等),需要关闭订单
快递员取消时选择的原因:
@GlobalTransactional
public Boolean updateStatus(PickupDispatchTaskDTO pickupDispatchTaskDTO) {
// 1. 校验参数
if (ObjectUtil.hasEmpty(pickupDispatchTaskDTO.getId(), pickupDispatchTaskDTO.getStatus())) {
throw new SLException(WorkExceptionEnum.PICKUP_DISPATCH_TASK_PARAM_ERROR);
}
// 2. 根据ID查询取派件任务
PickupDispatchTaskEntity pickupDispatchTask = super.getById(pickupDispatchTaskDTO.getId());
// 3. 判断取派件状态
switch (pickupDispatchTaskDTO.getStatus()) {
case NEW: {
// 3.1 修改不能是新增状态
throw new SLException(WorkExceptionEnum.PICKUP_DISPATCH_TASK_STATUS_NOT_NEW);
}
case COMPLETED: {
//3.2 完成状态 设置取派件为完成状态 设置完成时间 如果是派件任务 额外设置签收状态,如果是已签收 设置签收人
pickupDispatchTask.setStatus(PickupDispatchTaskStatus.COMPLETED);
//设置完成时间
pickupDispatchTask.setActualEndTime(LocalDateTime.now());
if (PickupDispatchTaskType.DISPATCH == pickupDispatchTask.getTaskType()) {
//如果是派件任务的完成,已签收需要设置签收状态和签收人,拒收只需要设置签收状态
pickupDispatchTask.setSignStatus(pickupDispatchTaskDTO.getSignStatus());
if (PickupDispatchTaskSignStatus.RECEIVED == pickupDispatchTaskDTO.getSignStatus()) {
pickupDispatchTask.setSignRecipient(pickupDispatchTaskDTO.getSignRecipient());
}
}
break;
}
case CANCELLED: {
// 3.3 如果是取消 设置取消信息
//任务取消
if (ObjectUtil.isEmpty(pickupDispatchTaskDTO.getCancelReason())) {
throw new SLException(WorkExceptionEnum.PICKUP_DISPATCH_TASK_PARAM_ERROR);
}
pickupDispatchTask.setStatus(PickupDispatchTaskStatus.CANCELLED);
pickupDispatchTask.setCancelReason(pickupDispatchTaskDTO.getCancelReason());
pickupDispatchTask.setCancelReasonDescription(pickupDispatchTaskDTO.getCancelReasonDescription());
pickupDispatchTask.setCancelTime(LocalDateTime.now());
// 如果是快递员主动取消任务 重新发送派件任务
if (pickupDispatchTaskDTO.getCancelReason() == PickupDispatchTaskCancelReason.RETURN_TO_AGENCY) {
//发送分配快递员派件任务的消息
//发送消息(取消任务发生在取件之前,没有运单,参数直接填入null)
this.sendPickupDispatchTaskMsgToDispatch(null,pickupDispatchTask);
} else if (pickupDispatchTaskDTO.getCancelReason() == PickupDispatchTaskCancelReason.CANCEL_BY_USER) {
//原因是用户取消,则订单状态改为取消
orderFeign.updateStatus(ListUtil.of(pickupDispatchTask.getOrderId()), OrderStatus.CANCELLED.getCode());
} else {
//其他原因则关闭订单
orderFeign.updateStatus(ListUtil.of(pickupDispatchTask.getOrderId()), OrderStatus.CLOSE.getCode());
}
break;
}
default: {
throw new SLException(WorkExceptionEnum.PICKUP_DISPATCH_TASK_PARAM_ERROR);
}
}
return super.updateById(pickupDispatchTask);
}
发消息的代码
@Resource
private MQFeign mqFeign;
/**
* 发送消息到调度中心,用于生成取派件任务
* @param transportOrder 运单
* @param pickupDispatchTask 取派件任务
*/
public void sendPickupDispatchTaskMsgToDispatch(TransportOrderEntity transportOrder, PickupDispatchTaskEntity pickupDispatchTask) {
OrderMsg orderMsg = OrderMsg.builder()
.agencyId(pickupDispatchTask.getAgencyId())
.orderId(pickupDispatchTask.getOrderId())
.created(DateUtil.current())
.taskType(PickupDispatchTaskType.PICKUP.getCode()) //取件任务
.mark(pickupDispatchTask.getMark())
.estimatedEndTime(pickupDispatchTask.getEstimatedEndTime()).build();
//查询订单对应的位置信息
OrderLocationDTO orderLocationDTO = this.orderFeign.findOrderLocationByOrderId(orderMsg.getOrderId());
//(1)运单为空:取件任务取消,取消原因为返回网点;重新调度位置取寄件人位置
//(2)运单不为空:生成的是派件任务,需要根据拒收状态判断位置是寄件人还是收件人
// 拒收:寄件人 其他:收件人
String location;
if (ObjectUtil.isEmpty(transportOrder)) {
location = orderLocationDTO.getSendLocation();
} else {
location = transportOrder.getIsRejection() ? orderLocationDTO.getSendLocation() : orderLocationDTO.getReceiveLocation();
}
Double[] coordinate = Convert.convert(Double[].class, StrUtil.split(location, ","));
Double longitude = coordinate[0];
Double latitude = coordinate[1];
//设置消息中的位置信息
orderMsg.setLongitude(longitude);
orderMsg.setLatitude(latitude);
//发送消息,用于生成取派件任务
this.mqFeign.sendMsg(Constants.MQ.Exchanges.ORDER_DELAYED, Constants.MQ.RoutingKeys.ORDER_CREATE,
orderMsg.toJson(), Constants.MQ.NORMAL_DELAY);
}
调度中心
调度中心微服务为快递员调度、车辆调度、运单车辆匹配的核心服务:
在调度中心中对于生成取派件任务的消息进行处理,消息内容类似这样
{
"orderId": 123,
"agencyId": 8001,
"taskType": 1,
"mark": "带包装",
"longitude": 116.111,
"latitude": 39.00,
"created": 1654224658728,
"info":"笔记本电脑",
"estimatedEndTime": 1654224658728
}
实现的关键点:
● 如果只查询到一个快递员,直接分配即可
● 如果是多个快递员,需要查询这些快递员当日的任务数,按照最少的进行分配,这样可以做到相对均衡
● 如果没有快递员,设置快递员id为空,可以在后台系统中,人为的进行调配快递员
● 对于取件任务而言,需要考虑用户选择的【期望上门时间】
○ 与当前时间相比,大于2小时发送延时消息,否则发送实时消息
/**
* 订单业务消息,接收到新订单后,根据快递员的负载情况,分配快递员
*/
@Slf4j
@Component
public class OrderMQListener {
@Resource
private MQFeign mqFeign;
@Resource
private CourierFeign courierFeign;
@Resource
private PickupDispatchTaskFeign pickupDispatchTaskFeign;
/**
* 如果有多个快递员,需要查询快递员今日的取派件数,根据此数量进行计算
* 计算的逻辑:优先分配取件任务少的,取件数相同的取第一个分配
* <p>
* 发送生成取件任务时需要计算时间差,如果小于2小时,实时发送;大于2小时,延时发送
* 举例:
* 1、现在10:30分,用户期望:11:00 ~ 12:00上门,实时发送
* 2、现在10:30分,用户期望:13:00 ~ 14:00上门,延时发送,12点发送消息,延时1.5小时发送
*
* @param msg 消息内容
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = Constants.MQ.Queues.DISPATCH_ORDER_TO_PICKUP_DISPATCH_TASK),
exchange = @Exchange(name = Constants.MQ.Exchanges.ORDER_DELAYED, type = ExchangeTypes.TOPIC, delayed = Constants.MQ.DELAYED),
key = Constants.MQ.RoutingKeys.ORDER_CREATE
))
public void listenOrderMsg(String msg) {
//{"orderId":123, "agencyId": 8001, "taskType":1, "mark":"带包装", "longitude":116.111, "latitude":39.00, "created":1654224658728, "estimatedStartTime": 1654224658728}
log.info("接收到订单的消息 >>> msg = {}", msg);
OrderMsg orderMsg = JSONUtil.toBean(msg, OrderMsg.class);
Long agencyId = orderMsg.getAgencyId(); //网点id
// 通过快递员微服务查询 可以为发件人服务的快递员(正常上班、服务范围内)
Double longitude = orderMsg.getLongitude();
Double latitude = orderMsg.getLatitude();
Long selectedCourierId = null;
List<Long> courierIds = this.courierFeign.queryCourierIdListByCondition(agencyId, longitude, latitude, LocalDateTimeUtil.toEpochMilli(orderMsg.getEstimatedEndTime()));
log.info("快递员微服务查出的ids:{}", courierIds);
if (CollUtil.isNotEmpty(courierIds)) {
//选中快递员
selectedCourierId = this.selectCourier(courierIds, orderMsg.getTaskType());
log.info("根据当日任务选出的快递员id:{}", selectedCourierId);
}
//发送消息
CourierTaskMsg courierTaskMsg = CourierTaskMsg.builder()
.courierId(selectedCourierId)
.agencyId(agencyId)
.taskType(orderMsg.getTaskType())
.orderId(orderMsg.getOrderId())
.mark(orderMsg.getMark())
.estimatedEndTime(orderMsg.getEstimatedEndTime())
.created(System.currentTimeMillis())
.build();
//计算时间差
long between = LocalDateTimeUtil.between(LocalDateTime.now(), orderMsg.getEstimatedEndTime(), ChronoUnit.MINUTES);
int delay = Constants.MQ.DEFAULT_DELAY; //默认实时发送
if (between > 120 && ObjectUtil.equal(orderMsg.getTaskType(), 1)) {
//计算延时时间,单位毫秒
LocalDateTime sendDataTime = LocalDateTimeUtil.offset(orderMsg.getEstimatedEndTime(), -2, ChronoUnit.HOURS);
delay = Convert.toInt(LocalDateTimeUtil.between(LocalDateTime.now(), sendDataTime, ChronoUnit.MILLIS));
}
this.mqFeign.sendMsg(Constants.MQ.Exchanges.PICKUP_DISPATCH_TASK_DELAYED,
Constants.MQ.RoutingKeys.PICKUP_DISPATCH_TASK_CREATE, courierTaskMsg.toJson(), delay);
}
/**
* 根据当日的任务数选取快递员
*
* @param courierIds 快递员列个表
* @param taskType 任务类型
* @return 选中的快递员id
*/
private Long selectCourier(List<Long> courierIds, Integer taskType) {
if (courierIds.size() == 1) {
return courierIds.get(0);
}
String date = DateUtil.date().toDateStr();
List<CourierTaskCountDTO> courierTaskCountDTOS = this.pickupDispatchTaskFeign
.findCountByCourierIds(courierIds, PickupDispatchTaskType.codeOf(taskType), date);
if (CollUtil.isEmpty(courierTaskCountDTOS)) {
//没有查到任务数量,默认给第一个快递员分配任务
return courierIds.get(0);
}
//查看任务数是否与快递员数相同,如果不相同需要补齐,设置任务数为0,这样就可以确保每个快递员都能分配到任务
if (ObjectUtil.notEqual(courierIds.size(), courierTaskCountDTOS.size())) {
List<CourierTaskCountDTO> dtoList = courierIds.stream()
.filter(courierId -> {
int index = CollUtil.indexOf(courierTaskCountDTOS, dto -> ObjectUtil.equal(courierId, dto.getCourierId()));
return index == -1;
})
.map(courierId -> CourierTaskCountDTO.builder()
.courierId(courierId)
.count(0L).build())
.collect(Collectors.toList());
//补齐到集合中
courierTaskCountDTOS.addAll(dtoList);
}
//选中任务数最小的快递员进行分配
CollUtil.sortByProperty(courierTaskCountDTOS, "count");
return courierTaskCountDTOS.get(0).getCourierId();
}
}
测试
对于OrderMQListener的测试,需要启动必要的服务,因为需要查询快递员(不查询也可以,就是无快递员的逻辑),此时就需要确保有快递员数据(确保在服务范围内或有所在机构的快递员),准备完成后,先进行单元测试,后面再进行整合测试。
用户下单成功后
查询到快递员:
延时发送
wok中接收到调度中心发来的消息:
生成任务
在work微服务中可以接收到来自调度中心的消息,接下来,我们需要编写消费消息的逻辑,生成快递员的取派件任务。
/**
* 生成快递员取派件任务
*
* @param msg 消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = Constants.MQ.Queues.WORK_PICKUP_DISPATCH_TASK_CREATE),
exchange = @Exchange(name = Constants.MQ.Exchanges.PICKUP_DISPATCH_TASK_DELAYED, type = ExchangeTypes.TOPIC, delayed = Constants.MQ.DELAYED),
key = Constants.MQ.RoutingKeys.PICKUP_DISPATCH_TASK_CREATE
))
public void listenCourierTaskMsg(String msg) {
//{"taskType":1,"orderId":225125208064,"created":1654767899885,"courierId":1001,"agencyId":8001,"estimatedStartTime":1654224658728,"mark":"带包装"}
log.info("接收到快递员任务的消息 >>> msg = {}", msg);
//解析消息
CourierTaskMsg courierTaskMsg = JSONUtil.toBean(msg, CourierTaskMsg.class);
//幂等性处理:判断订单对应的取派件任务是否存在,判断条件:订单号+任务状态
List<PickupDispatchTaskEntity> list = this.pickupDispatchTaskService.findByOrderId(courierTaskMsg.getOrderId(), PickupDispatchTaskType.codeOf(courierTaskMsg.getTaskType()));
for (PickupDispatchTaskEntity pickupDispatchTaskEntity : list) {
if (pickupDispatchTaskEntity.getStatus() == PickupDispatchTaskStatus.NEW) {
//消息重复消费
return;
}
}
// 订单不存在 不进行调度
OrderDTO orderDTO = orderFeign.findById(courierTaskMsg.getOrderId());
if (ObjectUtil.isEmpty(orderDTO)) {
return;
}
// 如果已经取消或者删除 则不进行调度
if (orderDTO.getStatus().equals(OrderStatus.CANCELLED.getCode()) || orderDTO.getStatus().equals(OrderStatus.DEL.getCode())) {
return;
}
PickupDispatchTaskEntity pickupDispatchTask = BeanUtil.toBean(courierTaskMsg, PickupDispatchTaskEntity.class);
//任务类型
pickupDispatchTask.setTaskType(PickupDispatchTaskType.codeOf(courierTaskMsg.getTaskType()));
//预计开始时间,结束时间向前推一小时
LocalDateTime estimatedStartTime = LocalDateTimeUtil.offset(pickupDispatchTask.getEstimatedEndTime(), -1, ChronoUnit.HOURS);
pickupDispatchTask.setEstimatedStartTime(estimatedStartTime);
// 默认未签收状态
pickupDispatchTask.setSignStatus(PickupDispatchTaskSignStatus.NOT_SIGNED);
//分配状态
if (ObjectUtil.isNotEmpty(pickupDispatchTask.getCourierId())) {
pickupDispatchTask.setAssignedStatus(PickupDispatchTaskAssignedStatus.DISTRIBUTED);
} else {
pickupDispatchTask.setAssignedStatus(PickupDispatchTaskAssignedStatus.MANUAL_DISTRIBUTED);
}
PickupDispatchTaskEntity result = this.pickupDispatchTaskService.saveTaskPickupDispatch(pickupDispatchTask);
if (result == null) {
//保存任务失败
throw new SLException(StrUtil.format("快递员任务保存失败 >>> msg = {}", msg));
}
}
评论( 0 )