前言
背景
现状
在早期的版本中,考虑独立出来一个服务用于专门与算法交互,如下图:
随着业务的发展,很多同学反应这样不方便,因为有的逻辑在业务系统做过一次,但在调用算法脚本前,shell服务中会根据双边约定的输入参数再做一次组装(如OSS附件的下载,需要构建完整的绝对路径,业务系统可能做过一次下载,但在shell服务无法读取业务系统的目录),不同的业务对应的算法不同,loadbalance按轮询的方式调用shell服务,有可能一个round下来,第一个还没完,新的请求又进来了,导致服务器忙死,跑太久还有可能会导致feign超时,为了一步步解决这些问题,先将调用算法的代码做成公共的基础工具类,业务系统引用相应的maven。如下图:
//解决feign超时配置
feign.client.config.default.connectTimeout = 160000000
feign.client.config.default.readTimeout = 160000000
feign.client.config.default.loggerLevel = basic
上述方案仍存在一些问题:
a. 框架在调用算法功能的时候会通过执行本服务器shell命令的方式来执行,这样服务器会另外起一个算法的进程,此进程不属于JVM管控范围。无法管理内存和cpu使用。
b. 在服务器内存不足的情况下,算法内存使用无法分配足够的内存,同时并不会自动解决相关问题。导致服务器长时间内存不足,服务器进入死机状态,影响业务的处理。
c. 现有算法内存使用预期不明确、并发进程数量不明确导致运维不能合理安排服务器。
d. 现有的kill docker的方案会引起服务上其他业务的中断、如果不kill出问题的docker,会引起服务器死机,导致服务器上全部服务出问题。影响面扩大
e. 算法进程不支持传参控制进程
f. 某个进程使用了过量的内存,并且可能已经造成了服务器死机,我们还不清楚是哪个进程无法push算法帮忙优化
如何解决
本文主要解决loadbalance 轮询策略下,目标实例不一定满足算法运行所需导致的问题。
当前每个服务在配置文件中都有基本的元数据信息,通过nacos注册中心可以看到:
spring.cloud.nacos.discovery.metadata.management.context-path = ${server.servlet.context-path}/actuator
spring.cloud.nacos.discovery.metadata.user.password = ${password}
spring.cloud.nacos.discovery.metadata.user.name = ${username}
spring.cloud.nacos.discovery.server-addr = ${ip:port}
spring.cloud.nacos.discovery.namespace = xxx_dev
spring.cloud.nacos.discovery.metadata.metricpath = ${server.servlet.context-path}/actuator/prometheus
spring.cloud.nacos.discovery.ephemeral = false
spring.cloud.nacos.discovery.metadata.healthpath = ${server.servlet.context-path}/health
@Bean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.enabled", matchIfMissing = true)
public NacosDiscoveryProperties nacosProperties() {
NacosDiscoveryProperties nacosDiscoveryProperties = new NacosDiscoveryProperties();
nacosDiscoveryProperties.getMetadata()//
.put("startup.time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
nacosDiscoveryProperties.getMetadata().put("git.version", readGitProperties());
return nacosDiscoveryProperties;
}
如果我们能获得运行算法的每个实例自身的cpu、总内存和空闲内存等数据,那就可以在此基础上实现“分流”。查阅相关资料,Nacos提供了对应的API可以修改元数据所以,重新做一个服务,实现自定义的loadbalance规则,如下图:
大致实现逻辑:在已发布的《SpringBoot集成Prometheus》,《Java JVM对象实例个数和空间占用大小 》等文章中已经演示过如何定时采集信息并上报。用同样的方法,做一个Job放到common tools,业务方升级后可以获取当前实例的资源信息,然后我们将这些信息通过调用Nacos Open API进行实例的修改,然后在服务收到请求后通过Open API查询某个服务的所有实例,解析对应的元数据,哪个空闲就调用哪个。
步骤说明
参考源码:com.alibaba.nacos.api.naming.pojo.Instance,先定义一个实例对象包含要采集的属性,在gitlab上存在一个项目:javasysmon,它提供了java对于机器状态,进程状态的获取
<!--maven依赖-->
<dependency>
<groupId>com.jezhumble.javasysmon</groupId>
<artifactId>javasysmon</artifactId>
<version>1.0.1</version>
</dependency>
#配置修改和查询 nacos 服务的api ,其它配置是为了灵活管理
tools.jvm.monitor.enable = true
tools.remote-shell.enable_custom_rule = true
tools.remote-shell.nacos_namespace_id = xxx_sit
tools.remote-shell.nacos_server_url = http://ip:port/
tools.remote-shell.nacos_instance_list_api = nacos/v1/ns/instance/list?namespaceId=%s&serviceName=%s&healthyOnly=true
tools.remote-shell.nacos_instance_edit_api = nacos/v1/ns/instance
tools.remote-shell.nacos_metadata_label_key = algo_label
tools.remote-shell.nacos_metadata_cpu_usage_key = jvm_cpu
tools.remote-shell.nacos_metadata_left_mem_key = jvm_left_mem
tools.remote-shell.enable_nacos_metadata_filter = false
@Configuration
@EnableAsync
@EnableScheduling
@ConditionalOnProperty(value = "tools.jvm.monitor.enable",havingValue = "true")
@Component("jvmMonitorRunner")
public class JVMMonitorRunner {
private Logger log = LoggerFactory.getLogger(this.getClass());
@Value("${server.port}")
private Integer SERVER_PORT;
@Value("${spring.application.name}")
private String SPRING_APPLICATION_NAME;
private final JavaSysMon javaSysMon;
@Autowired
private RemoteShellProperties remoteShellProperties;
public JVMMonitorRunner() {
javaSysMon = new JavaSysMon();
}
public static final Long ONE_MB = 1048576L;
/******************************************************************/
@Async("jvmMonitorExecutor")
@Scheduled(cron = "${xpilot.tools.jvm.monitor.cron:0/1 * * * * ?}")
public void jvmMonitor() {
try {
if (!remoteShellProperties.getEnable_custom_rule()) {
return;
}
if(!remoteShellProperties.getEnable_nacos_metadata_filter()){
log.debug("enable_nacos_metadata_filter is false,skip jvmMonitor");
return;
}
if(StringUtils.isEmpty(remoteShellProperties.getNacos_server_url()) || StringUtils.isEmpty(remoteShellProperties.getNacos_instance_edit_api())){
throw new RuntimeException("Nacos server url or instance edit api is empty");
}
long total = javaSysMon.physical().getTotalBytes() / ONE_MB;
long left = javaSysMon.physical().getAvailableBytes() / ONE_MB;
long now = total - left;
float cpu = this.getCpuUsage();
String property = System.getProperty(remoteShellProperties.getNacos_metadata_label_key());
log.debug("jvmMonitor info total:{},left:{},now:{},cpu:{}" ,total,left,now,cpu);
String url = remoteShellProperties.getNacos_server_url() + remoteShellProperties.getNacos_instance_edit_api();
//String editUrl = "http://8.130.126.187:8257/nacos/v1/ns/instance";
//构建元数据
Map<String,Object> metadata = new HashMap<>(1<<2);
metadata.put(remoteShellProperties.getNacos_metadata_left_mem_key(),left);
metadata.put(remoteShellProperties.getNacos_metadata_cpu_usage_key(),cpu);
metadata.put(remoteShellProperties.getNacos_metadata_label_key(),property);
//调用接口修改当前实例对应的元数据
Map<String,Object> params = new HashMap<>(1<<3);
params.put(SERVICE_KEY_NAMESPACE_ID,remoteShellProperties.getNacos_namespace_id());
params.put(SERVICE_KEY_SERVICE_NAME,SPRING_APPLICATION_NAME);
params.put(SERVICE_KEY_IP,InetAddress.getLocalHost().getHostAddress());
params.put(SERVICE_KEY_PORT,SERVER_PORT);
params.put(SERVICE_KEY_METADATA, JSON.toJSONString(metadata));
this.reportMetadata(url,params);
}catch (Exception e){
log.error("jvmMonitor error:", e);
}
}
@Retryable(backoff = @Backoff(delay = 2000,multiplier = 1))
private String reportMetadata(String url,Map<String,Object> params){
HttpResponse response = HttpRequest.put(url).
form(params).
timeout(NACOS_TIME_OUT).
execute();
String body = response.body();
log.debug("reportMetadata url:{} ,params:{},response:{}",url,params,body);
return body;
}
public double getJVMUsage() {
double jvmUsage = 0.0d;
try {
Runtime runtime = Runtime.getRuntime();
long total = runtime.totalMemory();
long free = runtime.freeMemory();
long used = total - free;
jvmUsage = used * 100.0f / total;
}catch (Exception e) {
//log.error("getJVMUsage error", e);
}
return jvmUsage;
}
public float getCpuUsage() {
CpuTimes cpuTimes = javaSysMon.cpuTimes();
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
log.error("", e);
return 1f;
}
return javaSysMon.cpuTimes().getCpuUsage(cpuTimes);
}
}
如果算法有多个版本,可以通过在启动参数中增加指定key用于表示要调用哪个版本的算法,不出意外,成功后可以通过注册中心看到如下情况:
根据传入的参数(如期望运行算法所需的内存大小、算法版本...)过滤出满足条件的实例,代码如下:
@Component
@Slf4j
public class RemoteLoadBalanceHelper {
@Autowired
private RemoteShellProperties remoteShellProperties;
@Autowired
private ObjectMapper objectMapper;
public LbResultVo getLbResultVo(Long expectedMemory, String label) throws ShellBusyException, LabelNotFoundException, ServiceNotFoundException {
//所有服务调用根据label+serviceName获取对应的服务 并且在里面负载均衡(使用随机策略)如果找不到服务返回异常 如果找到了但是内存都不足,返回忙碌异常
//step1.未打开配置
if (!remoteShellProperties.getEnable_custom_rule()) {
return new LbResultVo();
}
if(StringUtils.isEmpty(remoteShellProperties.getNacos_server_url()) || StringUtils.isEmpty(remoteShellProperties.getNacos_instance_list_api())){
throw new RuntimeException("Nacos server url or instance list api is empty");
}
String serviceName = remoteShellProperties.getServiceName();
//step2.查找当前运行环境对应的服务列表
//demo: nacos/v1/ns/instance/list?namespaceId=%s&serviceName=%s&healthyOnly=true
String url = remoteShellProperties.getNacos_server_url() + remoteShellProperties.getNacos_instance_list_api();
url = String.format(url, remoteShellProperties.getNacos_namespace_id(), serviceName);
HttpResponse response = HttpRequest.get(url).timeout(NACOS_TIME_OUT).execute();
//HttpResponse response = HttpRequest.get("http://ip:port/nacos/v1/ns/instance/list?namespaceId=cloud_mapping_sit&serviceName=" + serviceName+ "&healthyOnly=true").timeout(NACOS_TIME_OUT).execute();
if(response.getStatus() != HttpStatus.SC_OK){
log.error("Nacos request failed,code:{}",response.getStatus());
throw new NetworkException("Service return code:["+response.getStatus()+"] unavailable.Check Please");
}
JsonNode hosts = null;
try {
hosts = objectMapper.readTree(response.bodyBytes()).get(SERVICE_KEY_HOSTS);
} catch (IOException e) {
throw new RuntimeException(e);
}
if(hosts == null || hosts.isEmpty()){
throw new ServiceNotFoundException();
}
//step3.根据指定条件过滤列表
List<LbResultVo> instances = new ArrayList<>(hosts.size());
for (JsonNode host : hosts) {
JsonNode metadata = host.get(SERVICE_KEY_METADATA);
if(metadata == null){
continue;
}
JsonNode nodeIp = host.get(SERVICE_KEY_IP);
String ip = nodeIp == null ? BLANK : nodeIp.asText();
JsonNode nodePort = host.get(SERVICE_KEY_PORT);
int port = nodePort == null ? 0 : nodePort.asInt();
//case0.未开启元数据过滤 则所有的都作为可用实例
if(!remoteShellProperties.getEnable_nacos_metadata_filter()){
LbResultVo lbResultVo = new LbResultVo(ip, port);
lbResultVo.setServiceName(serviceName);
//默认值 代表无限大 未时使用过,避免shellBusyException
lbResultVo.setFreeMem(NACOS_SERVER_NORMAL);
lbResultVo.setTotalMem(NACOS_SERVER_NORMAL);
lbResultVo.setCpuUsage(0.00d);
instances.add(lbResultVo);
continue;
}
JsonNode nodeLeft = metadata.get(remoteShellProperties.getNacos_metadata_left_mem_key());
double left = nodeLeft == null ? 0.0d : nodeLeft.asDouble();
JsonNode nodeCpu = metadata.get(remoteShellProperties.getNacos_metadata_cpu_usage_key());
double cpu = nodeCpu == null ? 0.0d : nodeCpu.asDouble();
JsonNode nodeProperty = metadata.get(remoteShellProperties.getNacos_metadata_label_key());
String property = nodeProperty == null ? BLANK : nodeProperty.asText();
//case1.找出元数据中不包含指定key的实例或包含指定key但值为空的实例
if(StringUtils.isEmpty(label) && StringUtils.isEmpty(property)){
instances.add(new LbResultVo(left, cpu, serviceName, ip, port, property));
continue;
}
//case2.找出元数据中包含指定key的实例且值与传入值一致的实例
if(!property.equals(label)){
instances.add(new LbResultVo(left, cpu, serviceName, ip, port, property));
}
}
if(CollectionUtils.isEmpty(instances)){
throw new LabelNotFoundException();
}
List<LbResultVo> targets = instances.stream().filter(instance -> instance.getFreeMem() >= expectedMemory).collect(Collectors.toList());
if(CollectionUtils.isEmpty(targets)){
throw new ShellBusyException();
}
//step4.多个需要随机返回一个
return this.randomInstance(targets);
}
private LbResultVo randomInstance(List<LbResultVo> instances) {
if(instances.size() == 1){
return instances.get(0);
}
return new ArrayList<>(instances).get(new Random().nextInt(instances.size()));
}
}
上面的逻辑解决了按算法所需内存查询对应的实例,算是实现了一个小小的loadbalance。
其它方案
微服务架构下,注册中心维护着服务的健康和多实例间的loadbalance,但通常情况下,为服务申请的ECS资源都一样,所以loadbalance策略虽然有很多,默认不会调整策略,采用RoundRobinRule的方式来实现loadbalance。
目前接触的业务会频繁的跟算法交互,Java应用去调用算法同学提供的脚本(.sh 或 .py),再解析“输出”做后续的业务推进。
至于如何调用算法,大概代码如下:
ProcessBuilder processBuilder = new ProcessBuilder();
// /usr/bin/python3 /opt/scripts/xxx.py -i /xxx/xxx/input.json -o /xxx/xxx/output.json
processBuilder.command(commands);
Process process = processBuilder.start();
process.getInputStream()
更详细的在《批量更新SQL如何写》一篇最后有写,采用了模板方法的设计模式,感觉兴趣的可以跳过去快速看看。
这里更多的是记录一次学习成长的过程,其实理解下来就是需要弹性的管理服务,目前我们已经切为k8s管理。优点如下:
资源统一管理:节点所有资源(CPU、内存)作为资源池统一管理, 提供不同的算法程序运行。
资源充分利用:通过自定义配置(yaml)包括内存、CPU、优先级等配置实现丰富的调度策略,最大化利用节点资源。
自我修复:在节点失败的情况下,将运行的容器迁移到其他节点。
水平扩展:方便的添加节点, 就可以实现服务的水平扩展,扩容资源池。
存储编排:可以自动挂载所选择的存储系统,公开存储作为卷给Pod(不同的算法程
序)。
划水专用:
https://nacos.io/docs/latest/guide/user/open-api/#3.1
https://nacos.io/zh-cn/docs/open-api.html
https://blog.csdn.net/sD7O95O/article/details/121942841
纸上得来终觉浅,绝知此事要躬行
——陆游《冬夜读书示子聿》