前言
背景
望
了解结构:车道(lane)与车道边界(boundary)的对应关系为1:N;车道(lane)与中心线(center_line)的对应关系为1:1
预估数据量:lane数据量千万级,每个lane至少由2条左右boundary组成,所以数据量至少是lane表的 2倍,center_line与lane相当
下游约束:调用算法(脚本)需要的是boundary表中对应的geometry信息,要求lane与boundary的关系要自行维护正确且尽量小,否则影响时间
闻
目标数据整表查询出来放到内存肯定不行,加上后面要组装数据 OOM 跑不了
通过lane与boundary关系单一查询会导致boundary表的IO开销成倍增加,时间也不允许
算法部门的输出以文件方式提供,一个文件会有多个车道标识的中心线,需要解析出来转换为postgis支持的坐标系然后再刷库
问
与算法的同学沟通后得知脚本大致运行的时间且最终也是文件输出,那就反着以此为解决问题的出发点搞一个方案出来让大伙提意见:分批获取:limit #{size} offset #{offset}
构建出lane与boundary关系的Map
解析Map,再次limit #{size} offset #{offset} 扫描boundary提取geometry
构建出算法需要的数据格式存储到MQ
多线程分批消费MQ数据调用算法脚本
因为是分批次创建脚本输入,这就间接控制了脚本的输出也是分批次对应,解析出来后批量更新中心线数据表
切
结论
无论油车还是新能源汽车,都在软件方面加大投入,希望借助一些软件功能让消费者离下单更近一步。很多品牌的入门级代步车就已经有ACC、LCC等功能,在跑高速时能减轻用户疲劳、提升用户体验。如LCC的车道居中保持功能各品牌底层的实现又分为视觉算法或依赖图商提供的底图道路数据,但很多时候图商采集的数据历史久远或车道线比较模糊,所以汽车想要在辅助驾驶打开的情况下安全、平稳的运行,基于图商的数据二次处理就非常重要,新能源更是如此。
车辆在行驶时需要一条参考线RL(refer_line),用来判定行驶轨迹有没有偏离车道。最近接触一个业务,要求实现Lane(车道)中心线的计算和结果回写,需要尽快完成,下游系统还等着要。
最终版本完成后我本地跑一下没什么问题提交到测试环境,发现get key获取对应条件查询boundary这个过程中JVM堆的变化非常明显。后面用remove代表get成功降下来,因为与get方法一样,返回的是key对应的value,remove方法拿到值后可以释放资源
for (Map.Entry<String, LaneLeftAndRightDTO> entry : laneBoundMap.entrySet()) {
List<BoundaryDTO> bounds = entry.getValue().getLeft().stream().map(originBoundMap::remove).filter(Objects::nonNull).collect(Collectors.toList());
PolyLine2D leftPolyline = this.analysisGeoCoordinates(bounds);
//TODO rightPolyline logic ignore
laneLeftAndRightPloyLineDTOS.add(new LaneLeftAndRightPloyLineDTO(entry.getKey(),leftPolyline,rightPolyline));
}
打死不改版发布后多线程消费MQ时也触发一个灵异问题——因为与脚本通信输入输出都是文件的方式,在创建文件时目录我是以时间来创建,导致在多线程下时间相同,把不同的数据写到了同一个文件导致算法输出错误,解决方案是在异步之前生成对应的请求标识作为目录创建的一部分,问题得以解决
private void splitRunShellAndResetCenterLine(GenCenterLineReq req, Set<LaneLeftAndRightPloyLineDTO> laneLeftAndRightPloyLines) {
int from = 0,to = (laneLeftAndRightPloyLines.size() / req.getSize4file().intValue()) + 1;
Map<Integer,String> batchIds = this.generateNoRepeatId(to);//先维护批次信息
ConcurrentBlockingExecutor<Map.Entry<Integer,String>,Boolean> executor = new ConcurrentBlockingExecutor<>(batchIds.entrySet(),executorService);
executor.execute((entry,msg)->{
Integer index = entry.getKey();
List<LaneLeftAndRightPloyLineDTO> collect = laneLeftAndRightPloyLines.stream().skip(req.getSize4file() * index).limit(req.getSize4file().intValue()).collect(Collectors.toList());
//异步导致里时间会一样
//String batchId = DateUtil.formatDateToString(Calendar.getInstance().getTime(), DateUtil.YYYY_MM_DD_HH_MM_SS_SSS),
String input = JacksonUtil.simpleSerialize(collect),batchId = entry.getValue(); //batchIds.get(index)
GenCenterLineVo genCenterLineVo = new GenCenterLineVo(batchId, input);
//调用专门与C++或Python通信的服务
BaseResponseEntity<GenCenterLineVo> response = shellClient.execGenCenterLineLogic(genCenterLineVo);
//解析响应并更新DB
List<CenterLineBoundaryDTO> fileBody = JSON.parseArray(response.getData().getInput(), CenterLineBoundaryDTO.class);
int rows = centerLineMapper.batchUpdateCenterLineGeometry(fileBody, now());
return Boolean.TRUE;
});
return resp;
}
// 生成唯一标识
private Map<Integer,String> generateNoRepeatId(Integer size) {
Set<String> batchIds = new HashSet<>(size);
try {
do {
batchIds.add(DateUtil.formatDateToString(Calendar.getInstance().getTime(), DateUtil.YYYY_MM_DD_HH_MM_SS_SSS));
Thread.sleep(1L);
}while (batchIds.size() < size);
//构建轮询顺位对应的批次标识
AtomicInteger index = new AtomicInteger(0);
Map<Integer,String> IdMap = new HashMap<>(batchIds.size());
batchIds.stream().forEach(batchId->{
IdMap.put(index.getAndIncrement(),batchId);
});
return IdMap;
}catch (Exception ex){
logger.error("generateNoRepeatId exception",ex);
}
throw new BusinessException(CommonStateCode.ILLEGAL_STATUS.getCode(),"程序在生成请求唯一标识时出错了...");
}
<!-- 根据不同的入参构建批量更新SQL这个制胜关键,否则一行行写性能会很低-->
<update id="batchUpdateCenterLineGeometry">
update table_name
<trim prefix="set" suffixOverrides=",">
update_time = #{updateTime},
<if test="list!=null and list.size > 0">
<trim prefix="geometry =case" suffix="end,">
<foreach collection="list" item="item" index="index">
WHEN land_id=#{item.lane_id} THEN ST_GeomFromText(#{item.geometry,jdbcType=OTHER},4326)
</foreach>
</trim>
<!-- 这里可以同步更新其它列 -->
</if>
</trim>
<where>
<if test="list!=null and list.size > 0">
lane_id IN
<foreach collection="list" separator="," item="item" open="(" close=")">
#{item.lane_id}
</foreach>
</if>
AND is_deleted = false
</where>
</update>
为了灵活控制,构建请求参数时查询DB的limit size 和 算法批次写文件size分开控制 ,因为读DB的size可以大一写,但写要求小,否则生成的批量更新SQL会报超长错误;最终千万级数据的中心线刷完大概在50分钟左右可以完成。
P.S:工作中可能有的同学很少碰到Java调脚本语言,为方便读者参考,部分伪代码贴出来供大家学习;最后,希望我们中国也有坚持写到60的程序员,然后出规范统领世界。
// TDO 脚本服务使用模板方法实现,需要运维同学为该服务搭建相应的C或python的运行环境
public abstract class ShellRunTemplate {
//这个方法可以判断执行脚本的前置条件是否满足或创建相关的数据
protected void createFile(ShellRunVo shell) {
try (FileWriter fileWriter = new FileWriter(shell.getInputJson())) {
inputJsonPathFile.createNewFile();
fileWriter.write(inputJsonString);
} catch (Exception ex) {
new BusinessException(String.format("create inputJsonFile failed, inputJsonPath:[%s]", inputJsonPath));
}
}
//构建出与可执行扫命令行,需要注意每一个字符串放单独拆开放 如:
// /opt/xxx/xxx/xxs.sh -i input.json -o output.json
// /usr/bin/python3 /opt/xxx/xxx/xxs.py -i input.json -o output.json
public abstract String[] createCommands(ShellRunVo shellRunVo);
protected void commandsRun(String[] commands) {
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(commands);
String commandStr = Arrays.toString(commands);
Process process;
try {
log.info("Commands: {}", commandStr);
process = processBuilder.start();
} catch (IOException e) {
throw new BusinessException(String.format("process start error, commands:[%s]", e));
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
//记录脚本输出日志 方便排查
executorService.execute(() -> log(process.getInputStream(), "normal");
executorService.execute(() -> log(process.getErrorStream(),"error");
FutureTask<Integer> futureTask = new FutureTask(new ShellCallable(process));
executorService.submit(futureTask);
Integer exitValue = -1;
try {
exitValue = futureTask.get(60, imeUnit.MINUTES);
if (exitValue != 0) { //跟Linux一样,算法的退出码成功应该是0
log.warn("commandsRun failed, exitValue:{}", exitValue);
}
} catch (Exception exception) {
throw new BusinessException(String.format("run shell overtime, exceed times:[%s], command:[%s],exitValue:[%s]", 60, commandStr, exitValue));
} finally {
futureTask.cancel(true);
if (process != null) {
process.destroy();
}
if (executorService != null) {
executorService.shutdownNow();
}
}
}
private void log(InputStream inputStream, String logType) {
try (BufferedReader processBufferedInputStream = new BufferedReader(new InputStreamReader(inputStream))) {
String readLineStr;
while ((readLineStr = processBufferedInputStream.readLine()) != null) {
log.info("run {} shell log: [{}]", logType, readLineStr);
}
} catch (Exception e) {
if(e.getMessage().equalsIgnoreCase("Stream closed")){
log.info("Stream closed.");
return;
}
log.error("asyncPrintErrorStreamLog error", e);
}
}
//外部重写该模板方法后调用该run即可
protected void run(ShellRunVo shellRunVo) {
this.createFile(shellRunVo);
commandsRun(this.createCommands(shellRunVo));
}
有问题欢迎指出,听说吹水与关注更配哦!!!