接口拨测 Plus 版本

科技   2024-08-15 09:00   浙江  


!! 大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。


作者:乔克
公众号:运维开发故事
博客:www.jokerbai.com



之前写了一个《开发一个接口监控的Prometheus Exporter》,当时只是单纯的实现了一个简单的Exporter,但是基本能满足要求,最近对接口监控的需求做了升级,主要有:

  • 接口的管理通过前端页面实现,将数据存入数据库
  • 接口的校验除了可以校验状态码,还增加了返回值校验
  • 前端页面可以显示当前接口的可用性百分比
  • 拨测项可以灵活配置
    • 拨测频率可以灵活调整
    • 拨测结果校验可以灵活配置
    • 可以灵活开启或关闭拨测

功能的实现方式比较简单,梳理如下:

  • 用户创建拨测任务,将任务存入数据库
  • 后端为新的拨测起一个定时任务
  • 后端协程实时监听更新或者删除操作,更新定时任务
  • 拨测任务生成Prometheus指标,供Prometheus收集做监控告警使用

下面简单总结后端的实现和前端的效果。

Tips: 整个项目是使用gin-vue-admin搭建,拨测只是其中一个小功能。

后端实现

(1)定义数据库的结构体

// DialApi 拨测 结构体  
type DialApi struct {  
    global.GVA_MODEL  
    Name           string `json:"name" form:"name" gorm:"column:name;default:'';comment:接口名称;size:32;"`                                       //接口名称  
    Type           string `json:"type" form:"type" gorm:"column:type;default:'';comment:拨测类型 HTTP TCP PING DNS;size:8;"`                      // 拨测类型  
    HttpMethod     string `json:"httpMethod" form:"httpMethod" gorm:"column:http_method;default:GET;comment:HTTP请求方法;size:8;"`                //HTTP请求方法  
    Url            string `json:"url" form:"url" gorm:"column:url;comment:拨测地址;size:255;"binding:"required"`                                  //拨测地址  
    RequestBody    string `json:"requestBody" form:"requestBody" gorm:"column:request_body;comment:请求BODY;size:255;"`                         //拨测地址  
    Enabled        *bool  `json:"enabled" form:"enabled" gorm:"column:enabled;default:false;comment:是否启用;"binding:"required"`                 //是否启用  
    Application    string `json:"application" form:"application" gorm:"column:application;comment:所属应用;size:32;"`                             //所属应用  
    ExceptResponse string `json:"exceptResponse" form:"exceptResponse" gorm:"column:except_response;comment:预期返回值;size:32;"`                  //预期返回值  
    HttpStatus     int    `json:"httpStatus" form:"httpStatus" gorm:"column:http_status;type:smallint(5);default:200;comment:预期状态码;size:16;"` //预期状态码  
    Cron           string `json:"cron" form:"cron" gorm:"column:cron;comment:cron表达式;size:20;"`                                               //cron表达式  
    SuccessRate    string `json:"successRate" form:"successRate" gorm:"column:success_rate;comment:拨测成功率"`  
    CreatedBy      uint   `gorm:"column:created_by;comment:创建者"`  
    UpdatedBy      uint   `gorm:"column:updated_by;comment:更新者"`  
    DeletedBy      uint   `gorm:"column:deleted_by;comment:删除者"`  
}

在结构体中,主要定义拨测相关字段,比如拨测地址,返回值,状态码,拨测频率等,这些字段都通过前端页面填写。

然后就是对拨测任务的增删改查,这类接口比较通用,可以直接复制gin-vue-admin中的实例进行修改。

(2)对于新创建的拨测任务,需要将其加入到定时任务中。在这里做了偷懒,直接使用gin-vue-admin的定时任务功能。因此,需要实现一个Run方法,如下:

type StartDialApi struct{}  
type StartSingleDialApiTask struct{}

func (j *StartDialApi) Run() {  
    var dialService = service.ServiceGroupApp.DialApiServiceGroup.DialApiService  
    // 获取状态为打开的定时任务  
    pageInfo := dialApiReq.DialApiSearch{}  
    dialApiInfoList, _, err := dialService.GetDialApiInfoList(pageInfo)  
    if err == nil {  
       var option []cron.Option  
       option = append(option, cron.WithSeconds())  
       for _, dialApi := range dialApiInfoList {  
          // 将cron的值变成表达式  
          c := utils.ConvertToCronExpression(dialApi.Cron)  
          dialApi.Cron = c  
          dialService.AddSingleDialApiTimerTask(dialApi)  
       }  
    } else {  
       global.GVA_LOG.Error("获取拨测任务列表失败")  
    }  
}

然后会调用dialService.AddSingleDialApiTimerTask实现定时任务的真正操作。

func (dialService *DialApiService) AddSingleDialApiTimerTask(dialApiEntity dialApi.DialApi) {  
    var option []cron.Option  
    option = append(option, cron.WithSeconds())  
    idStr := strconv.Itoa(int(dialApiEntity.ID))  
    cronName := global.DIAL_API + idStr  
    taskName := global.DIAL_API + idStr  
    task, found := global.GVA_Timer.FindTask(cronName, taskName)  
    if !found {  
       if *dialApiEntity.Enabled {  
          _, err := global.GVA_Timer.AddTaskByFunc(cronName, dialApiEntity.Cron, func() {  
             global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "success").Add(0)  
             global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "failed").Add(0)  
             switch dialApiEntity.Type {  
             case "HTTP":  
                ok := checkHTTP(dialApiEntity)  
                if ok {  
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "success").Add(1)  
                } else {  
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "failed").Add(1)  
                }  
  
                // 记录日志  
                logHealthCheckResult(ok, nil, dialApiEntity, "HTTP")  
  
                // 获取Prometheus指标并存入数据库  
                getSuccessRateFromPrometheus(dialApiEntity)  
  
             case "TCP""DNS""ICMP":  
                var ok bool  
                var err error  
                switch dialApiEntity.Type {  
                case "TCP":  
                   ok, err = checkTCP(dialApiEntity)  
                case "DNS":  
                   ok, err = checkDNS(dialApiEntity)  
                case "ICMP":  
                   ok, err = checkICMP(dialApiEntity)  
                }  
                if ok {  
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "success").Add(1)  
                } else {  
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "failed").Add(1)  
                }  
  
                // 记录日志  
                logHealthCheckResult(ok, err, dialApiEntity, dialApiEntity.Type)  
  
                // 获取Prometheus指标并存入数据库  
                getSuccessRateFromPrometheus(dialApiEntity)  
             default:  
                global.GVA_LOG.Error("未知的检测类型",  
                   zap.String("DetectType", dialApiEntity.Type),  
                )  
             }  
          }, global.DIAL_API+idStr, option...)  
          if err != nil {  
             global.GVA_LOG.Error(fmt.Sprintf("添加拨测定时任务失败: %s : %s , 原因是: %s", idStr, dialApiEntity.Name, err.Error()))  
          }  
       }  
    } else {  
       if task.Spec != dialApiEntity.Cron {  
          global.GVA_LOG.Info(fmt.Sprintf("修改定时任务时间: %s", dialApiEntity.Name))  
          global.GVA_Timer.Clear(global.DIAL_API + idStr)  
          dialService.AddSingleDialApiTimerTask(dialApiEntity)  
       } else if !*dialApiEntity.Enabled || dialApiEntity.DeletedAt.Valid {  
          global.GVA_LOG.Info(fmt.Sprintf("停止拨测任务: %s", dialApiEntity.Name))  
          global.GVA_Timer.RemoveTaskByName(cronName, taskName)  
       }  
    }  
}

在该方法中,先判断定时任务是否已经存在,只有不存在且开启拨测的任务才会加入定时任务。否则,就会执行修改或者删除逻辑。

另外,为了方便前端显示拨测成功率,每次执行任务的时候会计算一次成功率,这里采用的是直接计算Prometheus指标,使用getSuccessRateFromPrometheus方法实现,如下:

func getSuccessRateFromPrometheus(dialApiEntity dialApi.DialApi) {  
    // 查询prometheus获取过去1小时的成功率  
    successQuery := fmt.Sprintf(`sum(rate(health_check_results{name="%s", type="%s", status="success"}[1h]))`, dialApiEntity.Name, dialApiEntity.Type)  
    totalQuery := fmt.Sprintf(`sum(rate(health_check_results{name="%s", type="%s"}[1h]))`, dialApiEntity.Name, dialApiEntity.Type)  
    successResponse, err := utils.QueryPrometheus(successQuery, global.GVA_CONFIG.Prometheus.Address)  
    if err != nil {  
       global.GVA_LOG.Error("Failed to query success rate from Prometheus", zap.Error(err))  
       return  
    }  
    totalResponse, err := utils.QueryPrometheus(totalQuery, global.GVA_CONFIG.Prometheus.Address)  
    if err != nil {  
       global.GVA_LOG.Error("Failed to query total rate from Prometheus", zap.Error(err))  
       return  
    }  
  
    // 解析 Prometheus 响应并计算成功率  
    var successValue float64  
    var totalValue float64  
    if len(successResponse.Data.Result) > 0 {  
       for _, result := range successResponse.Data.Result {  
          if value, ok := result.Value[1].(string); ok {  
             if value, err := strconv.ParseFloat(value, 64); err == nil {  
                successValue = value  
             }  
          }  
       }  
    }  
    if len(totalResponse.Data.Result) > 0 {  
       for _, result := range totalResponse.Data.Result {  
          if value, ok := result.Value[1].(string); ok {  
             if value, err := strconv.ParseFloat(value, 64); err == nil {  
                totalValue = value  
             }  
          }  
       }  
    }  
  
    if totalValue > 0 {  
       successRate := CalculateSuccessRate(successValue, totalValue)  
  
       // 获取数据库中最新的值  
       var dialService = DialApiService{}  
       dial, err := dialService.GetDialApi(strconv.Itoa(int(dialApiEntity.ID)))  
       if err != nil {  
          global.GVA_LOG.Error("获取任务失败", zap.String("err", err.Error()))  
       }  
       successRateStr := fmt.Sprintf("%.2f", successRate)  
       if dial.SuccessRate != successRateStr {  
          dial.SuccessRate = successRateStr  
          err := dialService.UpdateDialApi(dial)  
          if err != nil {  
             global.GVA_LOG.Error("更新任务成功率失败", zap.String("err", err.Error()))  
             return  
          }  
       }  
  
    }  
}  
  
// CalculateSuccessRate 计算成功率  
func CalculateSuccessRate(success, total float64) float64 {  
    if total == 0 {  
       return 0  
    }  
    return (success / total) * 100 // 返回百分比形式的成功率  
}

另外,拨测任务支持HTTPTCPDNS以及ICMP(ICMP功能未完善),代码如下:

func checkHTTP(dialApiEntity dialApi.DialApi) bool {  
    idStr := strconv.Itoa(int(dialApiEntity.ID))  
    var response *http.Response = nil  
    var httpErr error = nil  
    switch dialApiEntity.HttpMethod {  
    case "GET":  
       response, httpErr = http.Get(dialApiEntity.Url)  
       break  
    case "POST":  
       response, httpErr = http.Post(dialApiEntity.Url, "application/json", strings.NewReader(dialApiEntity.RequestBody))  
       break  
    default:  
    }  
    if response != nil {  
       dialApiRecrod := new(dialApi.DialApiRecrod)  
       dialApiRecrod.DialApiId = dialApiEntity.ID  
       dialApiRecrod.CreatedAt = time.Now()  
       dialApiRecrod.UpdatedAt = time.Now()  
  
       if httpErr == nil {  
          if response.StatusCode == dialApiEntity.HttpStatus {  
             // 如果定义了返回值判断  
             if dialApiEntity.ExceptResponse != "" {  
                bodyBytes, err := io.ReadAll(response.Body)  
                if err != nil {  
                   return false  
                }  
                if strings.Contains(string(bodyBytes), dialApiEntity.ExceptResponse) {  
                   return true  
                } else {  
                   return false  
                }  
             } else {  
                return true  
             }  
          } else {  
             global.GVA_LOG.Info(idStr + ":" + dialApiEntity.Name + "拨测结果与预期不一致")  
             return false  
          }  
       } else {  
          global.GVA_LOG.Error("拨测失败: " + dialApiEntity.Url)  
          dialApiRecrod.FailReason = httpErr.Error()  
          return false  
       }  
    }  
    return false  
}  
  
func checkTCP(dialApiEntity dialApi.DialApi) (bool, error) {  
    conn, err := net.DialTimeout("tcp", dialApiEntity.Url, 5*time.Second)  
    if err != nil {  
       return false, err  
    }  
    defer conn.Close()  
    return truenil  
}  
  
func checkDNS(dialApiEntity dialApi.DialApi) (bool, error) {  
    _, err := net.LookupHost(dialApiEntity.Url)  
    if err != nil {  
       return false, err  
    }  
    return truenil  
}  
  
func checkICMP(dialApiEntity dialApi.DialApi) (bool, error) {  
    pinger, err := ping.NewPinger(dialApiEntity.Url)  
    if err != nil {  
       return false, err  
    }  
    pinger.Count = 2  
    err = pinger.Run() // Blocks until finished.  
    if err != nil {  
       return false, err  
    }  
    return truenil  
}

其中HTTP拨测是比较常用的,相比之前的Prometheus Exporter,这里丰富了对结果的校验,使拨测的结果值更准确。

(3)如果遇到拨测任务的更新或者删除,有一个定时的协程去处理。如下:

func startUpdateDialCron() {  
    var dialService = service.ServiceGroupApp.DialApiServiceGroup.DialApiService  
    for {  
       select {  
       case updateId := <-global.UpdateDialAPIChannel:  
          // 获取数据  
          if updateId != "" {  
             dial, err := dialService.GetDialApi(updateId)  
             if err != nil {  
                global.GVA_LOG.Error("获取任务失败", zap.String("err", err.Error()))  
                continue  
             } else {  
                // 先删除旧的定时任务  
                global.GVA_LOG.Info("更新定时任务", zap.String("updateId", updateId))  
                cronName := global.DIAL_API + updateId  
                taskName := global.DIAL_API + updateId  
                if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {  
                   global.GVA_Timer.Clear(cronName)  
                   // 启动新的定时任务  
                   // 将cron的值变成表达式  
                   c := utils.ConvertToCronExpression(dial.Cron)  
                   dial.Cron = c  
                   dialService.AddSingleDialApiTimerTask(dial)  
                }  
             }  
          }  
       case deleteId := <-global.DeleteDialAPIChannel:  
          if deleteId != "" {  
             cronName := global.DIAL_API + deleteId  
             taskName := global.DIAL_API + deleteId  
             if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {  
                global.GVA_LOG.Info("删除定时任务", zap.String("updateId", deleteId))  
                global.GVA_Timer.RemoveTaskByName(cronName, taskName)  
             }  
          }  
       }  
    }  
}

该协程监听global.UpdateDialAPIChannelglobal.DeleteDialAPIChannel这两个channel,然后再调用dialService.AddSingleDialApiTimerTask对定时任务进行操作。

上面就是简单的接口拨测的功能实现,因能力有限,所以代码比较混乱。

前端展示

为了便于日常的维护,所以开发一个前端界面,主要支持拨测任务的增删改查。

新增拨测任务,可以灵活选择拨测类型以及定义返回值和状态码。

然后可以查看拨测任务的具体情况,也可以灵活开启或者关闭或者任务。

监控告警

在前端页面只是展示了成功率,实际告警还是通过Prometheus实现,该平台暂未实现直接配置告警。

所以,只需要创建一个Prometheus收集的Job,就可以查看对应的指标,指标名是health_check_results,如下:

然后再配置一个告警规则,在使用率低于100%的时候发送告警通知,如下:

至此,整个功能就实现了,足够满足日常使用。在公有云上,是有成熟的拨测产品,不过有的收费比较贵,好处是可以实现不同地区的拨测,覆盖面比较广。另外,也可以使用Black Exporter实现拨测,其也支持上面的所有功能,只是没有前端的维护界面,不过功能强大很多。


最后,求关注。如果你还想看更多优质原创文章,欢迎关注我们的公众号「运维开发故事」。

如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

你还可以把我的公众号设为「星标」,这样当公众号文章更新时,你会在第一时间收到推送消息,避免错过我的文章更新。




我是 乔克,《运维开发故事》公众号团队中的一员,一线运维农民工,云原生实践者,这里不仅有硬核的技术干货,还有我们对技术的思考和感悟,欢迎关注我们的公众号,期待和你一起成长!



运维开发故事
由一群志同道合的小伙伴共同维护,有运维也有开发,内容不限于Linux运维,devops工具链,k8s容器化技术,监控,日志收集,网络安全,Python或GO开发,团队成员有乔克、wanger、冬哥、素心、华仔、郑哥、夏老师
 最新文章