基于Nacos metadata 实现Loadbalance

文摘   职场   2024-10-14 11:40   江苏  
  1. 前言

  2. 微服务架构下,注册中心维护着服务的健康和多实例间的loadbalance,但通常情况下,为服务申请的ECS资源都一样,所以loadbalance策略虽然有很多,默认不会调整策略,采用RoundRobinRule的方式来实现loadbalance。


  3. 背景

  4. 目前接触的业务会频繁的跟算法交互,Java应用去调用算法同学提供的脚本(.sh 或 .py),再解析“输出”做后续的业务推进。

    至于如何调用算法,大概代码如下:

    ProcessBuilder processBuilder = new ProcessBuilder();// /usr/bin/python3 /opt/scripts/xxx.py -i /xxx/xxx/input.json -o /xxx/xxx/output.jsonprocessBuilder.command(commands);Process process = processBuilder.start();process.getInputStream()

    更详细的在《批量更新SQL如何写》一篇最后有写,采用了模板方法的设计模式,感觉兴趣的可以跳过去快速看看。


  5. 现状

    在早期的版本中,考虑独立出来一个服务用于专门与算法交互,如下图:

    随着业务的发展,很多同学反应这样不方便,因为有的逻辑在业务系统做过一次,但在调用算法脚本前,shell服务中会根据双边约定的输入参数再做一次组装(如OSS附件的下载,需要构建完整的绝对路径,业务系统可能做过一次下载,但在shell服务无法读取业务系统的目录),不同的业务对应的算法不同,loadbalance按轮询的方式调用shell服务,有可能一个round下来,第一个还没完,新的请求又进来了,导致服务器忙死,跑太久还有可能会导致feign超时,为了一步步解决这些问题,先将调用算法的代码做成公共的基础工具类,业务系统引用相应的maven。如下图:

    //解决feign超时配置feign.client.config.default.connectTimeout = 160000000feign.client.config.default.readTimeout = 160000000feign.client.config.default.loggerLevel = basic

    上述方案仍存在一些问题:

    a. 框架在调用算法功能的时候会通过执行本服务器shell命令的方式来执行,这样服务器会另外起一个算法的进程,此进程不属于JVM管控范围。无法管理内存和cpu使用。

    b. 在服务器内存不足的情况下,算法内存使用无法分配足够的内存,同时并不会自动解决相关问题。导致服务器长时间内存不足,服务器进入死机状态,影响业务的处理。

    c. 现有算法内存使用预期不明确、并发进程数量不明确导致运维不能合理安排服务器。

    d. 现有的kill docker的方案会引起服务上其他业务的中断、如果不kill出问题的docker,会引起服务器死机,导致服务器上全部服务出问题。影响面扩大

    e. 算法进程不支持传参控制进程

    f. 某个进程使用了过量的内存,并且可能已经造成了服务器死机,我们还不清楚是哪个进程无法push算法帮忙优化


  6. 如何解决

    本文主要解决loadbalance 轮询策略下,目标实例不一定满足算法运行所需导致的问题。

    当前每个服务在配置文件中都有基本的元数据信息,通过nacos注册中心可以看到:

    spring.cloud.nacos.discovery.metadata.management.context-path = ${server.servlet.context-path}/actuatorspring.cloud.nacos.discovery.metadata.user.password = ${password}spring.cloud.nacos.discovery.metadata.user.name =  ${username}spring.cloud.nacos.discovery.server-addr = ${ip:port}spring.cloud.nacos.discovery.namespace = xxx_devspring.cloud.nacos.discovery.metadata.metricpath = ${server.servlet.context-path}/actuator/prometheusspring.cloud.nacos.discovery.ephemeral = falsespring.cloud.nacos.discovery.metadata.healthpath = ${server.servlet.context-path}/health


    @Bean@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.enabled", matchIfMissing = true)public NacosDiscoveryProperties nacosProperties() {    NacosDiscoveryProperties nacosDiscoveryProperties = new NacosDiscoveryProperties();    nacosDiscoveryProperties.getMetadata()//            .put("startup.time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));    nacosDiscoveryProperties.getMetadata().put("git.version", readGitProperties());    return nacosDiscoveryProperties;}


    如果我们能获得运行算法的每个实例自身的cpu、总内存和空闲内存等数据,那就可以在此基础上实现“分流”。查阅相关资料,Nacos提供了对应的API可以修改元数据所以,重新做一个服务,实现自定义的loadbalance规则,如下图:


    大致实现逻辑在已发布的《SpringBoot集成Prometheus》,Java JVM对象实例个数和空间占用大小 》等文章中已经演示过如何定时采集信息并上报。用同样的方法,做一个Job放到common tools,业务方升级后可以获取当前实例的资源信息,然后我们将这些信息通过调用Nacos Open API进行实例的修改,然后在服务收到请求后通过Open API查询某个服务的所有实例,解析对应的元数据,哪个空闲就调用哪个。


  7. 步骤说明

    参考源码:com.alibaba.nacos.api.naming.pojo.Instance,先定义一个实例对象包含要采集的属性,在gitlab上存在一个项目:javasysmon,它提供了java对于机器状态,进程状态的获取

    <!--maven依赖--><dependency>    <groupId>com.jezhumble.javasysmon</groupId>    <artifactId>javasysmon</artifactId>    <version>1.0.1</version></dependency>
    #配置修改和查询 nacos 服务的api ,其它配置是为了灵活管理tools.jvm.monitor.enable = truetools.remote-shell.enable_custom_rule = truetools.remote-shell.nacos_namespace_id = xxx_sittools.remote-shell.nacos_server_url = http://ip:port/tools.remote-shell.nacos_instance_list_api = nacos/v1/ns/instance/list?namespaceId=%s&serviceName=%s&healthyOnly=truetools.remote-shell.nacos_instance_edit_api = nacos/v1/ns/instancetools.remote-shell.nacos_metadata_label_key = algo_labeltools.remote-shell.nacos_metadata_cpu_usage_key = jvm_cputools.remote-shell.nacos_metadata_left_mem_key = jvm_left_memtools.remote-shell.enable_nacos_metadata_filter = false
    @Configuration@EnableAsync@EnableScheduling@ConditionalOnProperty(value = "tools.jvm.monitor.enable",havingValue = "true")@Component("jvmMonitorRunner")public class JVMMonitorRunner {    private Logger log = LoggerFactory.getLogger(this.getClass());    @Value("${server.port}")    private Integer SERVER_PORT;    @Value("${spring.application.name}")    private String SPRING_APPLICATION_NAME;
    private final JavaSysMon javaSysMon; @Autowired private RemoteShellProperties remoteShellProperties;
    public JVMMonitorRunner() { javaSysMon = new JavaSysMon(); } public static final Long ONE_MB = 1048576L; /******************************************************************/ @Async("jvmMonitorExecutor") @Scheduled(cron = "${xpilot.tools.jvm.monitor.cron:0/1 * * * * ?}") public void jvmMonitor() { try { if (!remoteShellProperties.getEnable_custom_rule()) { return; } if(!remoteShellProperties.getEnable_nacos_metadata_filter()){ log.debug("enable_nacos_metadata_filter is false,skip jvmMonitor"); return; } if(StringUtils.isEmpty(remoteShellProperties.getNacos_server_url()) || StringUtils.isEmpty(remoteShellProperties.getNacos_instance_edit_api())){ throw new RuntimeException("Nacos server url or instance edit api is empty"); }

    long total = javaSysMon.physical().getTotalBytes() / ONE_MB; long left = javaSysMon.physical().getAvailableBytes() / ONE_MB; long now = total - left; float cpu = this.getCpuUsage(); String property = System.getProperty(remoteShellProperties.getNacos_metadata_label_key()); log.debug("jvmMonitor info total:{},left:{},now:{},cpu:{}" ,total,left,now,cpu);
    String url = remoteShellProperties.getNacos_server_url() + remoteShellProperties.getNacos_instance_edit_api(); //String editUrl = "http://8.130.126.187:8257/nacos/v1/ns/instance";
    //构建元数据 Map<String,Object> metadata = new HashMap<>(1<<2); metadata.put(remoteShellProperties.getNacos_metadata_left_mem_key(),left); metadata.put(remoteShellProperties.getNacos_metadata_cpu_usage_key(),cpu); metadata.put(remoteShellProperties.getNacos_metadata_label_key(),property);
    //调用接口修改当前实例对应的元数据 Map<String,Object> params = new HashMap<>(1<<3); params.put(SERVICE_KEY_NAMESPACE_ID,remoteShellProperties.getNacos_namespace_id()); params.put(SERVICE_KEY_SERVICE_NAME,SPRING_APPLICATION_NAME); params.put(SERVICE_KEY_IP,InetAddress.getLocalHost().getHostAddress()); params.put(SERVICE_KEY_PORT,SERVER_PORT); params.put(SERVICE_KEY_METADATA, JSON.toJSONString(metadata));
    this.reportMetadata(url,params); }catch (Exception e){ log.error("jvmMonitor error:", e); } }

    @Retryable(backoff = @Backoff(delay = 2000,multiplier = 1)) private String reportMetadata(String url,Map<String,Object> params){ HttpResponse response = HttpRequest.put(url). form(params). timeout(NACOS_TIME_OUT). execute(); String body = response.body(); log.debug("reportMetadata url:{} ,params:{},response:{}",url,params,body); return body; }
    public double getJVMUsage() { double jvmUsage = 0.0d; try { Runtime runtime = Runtime.getRuntime(); long total = runtime.totalMemory(); long free = runtime.freeMemory(); long used = total - free; jvmUsage = used * 100.0f / total; }catch (Exception e) { //log.error("getJVMUsage error", e); } return jvmUsage; }
    public float getCpuUsage() { CpuTimes cpuTimes = javaSysMon.cpuTimes(); try { Thread.sleep(500L); } catch (InterruptedException e) { log.error("", e); return 1f; } return javaSysMon.cpuTimes().getCpuUsage(cpuTimes); }}

    如果算法有多个版本,可以通过在启动参数中增加指定key用于表示要调用哪个版本的算法,不出意外,成功后可以通过注册中心看到如下情况:



    根据传入的参数(如期望运行算法所需的内存大小、算法版本...)过滤出满足条件的实例,代码如下:

    @Component@Slf4jpublic class RemoteLoadBalanceHelper {
    @Autowired private RemoteShellProperties remoteShellProperties;
    @Autowired private ObjectMapper objectMapper;
    public LbResultVo getLbResultVo(Long expectedMemory, String label) throws ShellBusyException, LabelNotFoundException, ServiceNotFoundException { //所有服务调用根据label+serviceName获取对应的服务 并且在里面负载均衡(使用随机策略)如果找不到服务返回异常 如果找到了但是内存都不足,返回忙碌异常 //step1.未打开配置 if (!remoteShellProperties.getEnable_custom_rule()) { return new LbResultVo(); } if(StringUtils.isEmpty(remoteShellProperties.getNacos_server_url()) || StringUtils.isEmpty(remoteShellProperties.getNacos_instance_list_api())){ throw new RuntimeException("Nacos server url or instance list api is empty"); } String serviceName = remoteShellProperties.getServiceName();
    //step2.查找当前运行环境对应的服务列表 //demo: nacos/v1/ns/instance/list?namespaceId=%s&serviceName=%s&healthyOnly=true String url = remoteShellProperties.getNacos_server_url() + remoteShellProperties.getNacos_instance_list_api(); url = String.format(url, remoteShellProperties.getNacos_namespace_id(), serviceName); HttpResponse response = HttpRequest.get(url).timeout(NACOS_TIME_OUT).execute(); //HttpResponse response = HttpRequest.get("http://ip:port/nacos/v1/ns/instance/list?namespaceId=cloud_mapping_sit&serviceName=" + serviceName+ "&healthyOnly=true").timeout(NACOS_TIME_OUT).execute(); if(response.getStatus() != HttpStatus.SC_OK){ log.error("Nacos request failed,code:{}",response.getStatus()); throw new NetworkException("Service return code:["+response.getStatus()+"] unavailable.Check Please"); }
    JsonNode hosts = null; try { hosts = objectMapper.readTree(response.bodyBytes()).get(SERVICE_KEY_HOSTS); } catch (IOException e) { throw new RuntimeException(e); } if(hosts == null || hosts.isEmpty()){ throw new ServiceNotFoundException(); }
    //step3.根据指定条件过滤列表 List<LbResultVo> instances = new ArrayList<>(hosts.size()); for (JsonNode host : hosts) { JsonNode metadata = host.get(SERVICE_KEY_METADATA); if(metadata == null){ continue; }
    JsonNode nodeIp = host.get(SERVICE_KEY_IP); String ip = nodeIp == null ? BLANK : nodeIp.asText();
    JsonNode nodePort = host.get(SERVICE_KEY_PORT); int port = nodePort == null ? 0 : nodePort.asInt();
    //case0.未开启元数据过滤 则所有的都作为可用实例 if(!remoteShellProperties.getEnable_nacos_metadata_filter()){ LbResultVo lbResultVo = new LbResultVo(ip, port); lbResultVo.setServiceName(serviceName); //默认值 代表无限大 未时使用过,避免shellBusyException lbResultVo.setFreeMem(NACOS_SERVER_NORMAL); lbResultVo.setTotalMem(NACOS_SERVER_NORMAL); lbResultVo.setCpuUsage(0.00d); instances.add(lbResultVo); continue; }
    JsonNode nodeLeft = metadata.get(remoteShellProperties.getNacos_metadata_left_mem_key()); double left = nodeLeft == null ? 0.0d : nodeLeft.asDouble();
    JsonNode nodeCpu = metadata.get(remoteShellProperties.getNacos_metadata_cpu_usage_key()); double cpu = nodeCpu == null ? 0.0d : nodeCpu.asDouble();
    JsonNode nodeProperty = metadata.get(remoteShellProperties.getNacos_metadata_label_key()); String property = nodeProperty == null ? BLANK : nodeProperty.asText();
    //case1.找出元数据中不包含指定key的实例或包含指定key但值为空的实例 if(StringUtils.isEmpty(label) && StringUtils.isEmpty(property)){ instances.add(new LbResultVo(left, cpu, serviceName, ip, port, property)); continue; } //case2.找出元数据中包含指定key的实例且值与传入值一致的实例 if(!property.equals(label)){ instances.add(new LbResultVo(left, cpu, serviceName, ip, port, property)); } } if(CollectionUtils.isEmpty(instances)){ throw new LabelNotFoundException(); } List<LbResultVo> targets = instances.stream().filter(instance -> instance.getFreeMem() >= expectedMemory).collect(Collectors.toList()); if(CollectionUtils.isEmpty(targets)){ throw new ShellBusyException(); } //step4.多个需要随机返回一个 return this.randomInstance(targets); }
    private LbResultVo randomInstance(List<LbResultVo> instances) { if(instances.size() == 1){ return instances.get(0); } return new ArrayList<>(instances).get(new Random().nextInt(instances.size())); }}

    上面的逻辑解决了按算法所需内存查询对应的实例,算是实现了一个小小的loadbalance。


  8. 其它方案

  9. 这里更多的是记录一次学习成长的过程,其实理解下来就是需要弹性的管理服务,目前我们已经切为k8s管理。优点如下

    资源统一管理:节点所有资源(CPU、内存)作为资源池统一管理, 提供不同的算法程序运行。

    资源充分利用:通过自定义配置(yaml)包括内存、CPU、优先级等配置实现丰富的调度策略,最大化利用节点资源。

    自我修复:在节点失败的情况下,将运行的容器迁移到其他节点。

    水平扩展:方便的添加节点, 就可以实现服务的水平扩展,扩容资源池。

    存储编排:可以自动挂载所选择的存储系统,公开存储作为卷给Pod(不同的算法程

    序)。


    划水专用:

    https://nacos.io/docs/latest/guide/user/open-api/#3.1

    https://nacos.io/zh-cn/docs/open-api.html

    https://blog.csdn.net/sD7O95O/article/details/121942841



    纸上得来终觉浅,绝知此事要躬行

    ——陆游《冬夜读书示子聿》




晚霞程序员
一位需要不断学习的30+程序员……