如何在Prometheus告警恢复时获取实时值

科技   2024-11-22 17:43   上海  

背景概述

当我们prometheus告警恢复时是拿不到当前值的,当然也有其他的方式进行拿取,感觉不是很优雅,为此看了一下prometheus源码整理一下最近的学习内容,分享一下。

告警恢复如何获取当前值

  1. 告警规则里添加promQL,需要去掉告警条件

    需要修改的地方比较多,如果查询的promQL数据比较多返回比较慢。因为没有进行标签匹配,会返回所有的查询内容

  2. 自建告警中心,利用promQL进行查询

    需要有编码能力,灵活性比较好。

  3. 解析promQL

    这里说的解析promQL是将其解析为AST,然后去除告警条件,并附加告警标签。

解析PromQL

这里我们讲解一下如何解析PromQL,当然你可以通过接口进行解析,可以看上边的AST,我们这里使用代码的方式进行解析。

解析为AST

import (
"encoding/json"
"fmt"
"github.com/prometheus/prometheus/model/labels"
"log"
"strings"

"github.com/prometheus/prometheus/promql/parser"
)



func sanitizeList(l []string) []string {
if l == nil {
return []string{}
}
return l
}

func translateMatchers(in []*labels.Matcher) interface{} {
out := []map[string]interface{}{}
out = append(out, map[string]interface{}{
"name": "__addLabels__",
"value": "www_kubesre_com",
"type": "=",
})
for _, m := range in {
out = append(out, map[string]interface{}{
"name": m.Name,
"value": m.Value,
"type": m.Type.String(),
})
}
return out
}

func getStartOrEnd(startOrEnd parser.ItemType) interface{} {
if startOrEnd == 0 {
return nil
}

return startOrEnd.String()
}

func translateAST(node parser.Expr) interface{} {
if node == nil {
return nil
}

switch n := node.(type) {
case *parser.AggregateExpr:
fmt.Println("AggregateExpr")
return map[string]interface{}{
"type": "aggregation",
"op": n.Op.String(),
"expr": translateAST(n.Expr),
"param": translateAST(n.Param),
"grouping": sanitizeList(n.Grouping),
"without": n.Without,
}
case *parser.BinaryExpr:
fmt.Println("BinaryExpr")
var matching interface{}
if m := n.VectorMatching; m != nil {

matching = map[string]interface{}{
"card": m.Card.String(),
"labels": sanitizeList(m.MatchingLabels),
"on": m.On,
"include": sanitizeList(m.Include),
}
}
if n.Op.IsComparisonOperator() {
return map[string]interface{}{
"type": "binaryExpr",
"lhs": translateAST(n.LHS),
"rhs": translateAST(n.RHS),
"matching": matching,
"bool": n.ReturnBool,
}
}

return map[string]interface{}{
"type": "binaryExpr",
"op": n.Op.String(),
"lhs": translateAST(n.LHS),
"rhs": translateAST(n.RHS),
"matching": matching,
"bool": n.ReturnBool,
}
case *parser.Call:
args := []interface{}{}
for _, arg := range n.Args {
args = append(args, translateAST(arg))
}
fmt.Println("Call")
return map[string]interface{}{
"type": "call",
"func": map[string]interface{}{
"name": n.Func.Name,
"argTypes": n.Func.ArgTypes,
"variadic": n.Func.Variadic,
"returnType": n.Func.ReturnType,
},
"args": args,
}
case *parser.MatrixSelector:
vs := n.VectorSelector.(*parser.VectorSelector)
fmt.Println("MatrixSelector")
return map[string]interface{}{
"type": "matrixSelector",
"name": vs.Name,
"range": n.Range.Milliseconds(),
"offset": vs.OriginalOffset.Milliseconds(),
"matchers": translateMatchers(vs.LabelMatchers),
"timestamp": vs.Timestamp,
"startOrEnd": getStartOrEnd(vs.StartOrEnd),
}
case *parser.SubqueryExpr:
fmt.Println("SubqueryExpr", translateAST(n.Expr))
return map[string]interface{}{
"type": "subquery",
"expr": translateAST(n.Expr),
"range": n.Range.Milliseconds(),
"offset": n.OriginalOffset.Milliseconds(),
"step": n.Step.Milliseconds(),
"timestamp": n.Timestamp,
"startOrEnd": getStartOrEnd(n.StartOrEnd),
}
case *parser.NumberLiteral:
fmt.Println("NumberLiteral")
return map[string]string{
"type": "numberLiteral",
}
case *parser.ParenExpr:
fmt.Println("ParenExpr")
return map[string]interface{}{
"type": "parenExpr",
"expr": translateAST(n.Expr),
}
case *parser.StringLiteral:
fmt.Println("StringLiteral")
return map[string]interface{}{
"type": "stringLiteral",
"val": n.Val,
}
case *parser.UnaryExpr:
fmt.Println("UnaryExpr")
return map[string]interface{}{
"type": "unaryExpr",
"op": n.Op.String(),
"expr": translateAST(n.Expr),
}
case *parser.VectorSelector:
fmt.Println("VectorSelector")
return map[string]interface{}{
"type": "vectorSelector",
"name": n.Name,
"offset": n.OriginalOffset.Milliseconds(),
"matchers": translateMatchers(n.LabelMatchers),
"timestamp": n.Timestamp,
"startOrEnd": getStartOrEnd(n.StartOrEnd),
}
}
panic("unsupported node type")
}

这里我们不仅仅是将其解析为AST,并去掉了告警判断条件和添加了标签

AST解析为PromQL

我们讲解解析为PromQL便于我们进行查询

func interfaceSliceToStringSlice(slice []interface{}) []string {
result := make([]string, len(slice))
for i, v := range slice {
result[i] = v.(string)
}
return result
}

// 将解析后的AST重新组装为PromQL字符串

func assemblePromQL(node map[string]interface{}) string {
switch node["type"] {
case "aggregation":
expr := assemblePromQL(node["expr"].(map[string]interface{}))
op := node["op"].(string)
grouping := node["grouping"].([]interface{})
groupingStr := ""
if len(grouping) > 0 {
groupingStr = " by (" + strings.Join(interfaceSliceToStringSlice(grouping), ", ") + ")"
}
return fmt.Sprintf("%s%s(%s)", op, groupingStr, expr)

case "binaryExpr":
// 如果有匹配条件,可能需要检查是否是需要的子表达式
if node["matching"] == nil && node["op"] == ">" {
// 只返回左侧表达式
return assemblePromQL(node["lhs"].(map[string]interface{}))
}
lhs := assemblePromQL(node["lhs"].(map[string]interface{}))
rhs := assemblePromQL(node["rhs"].(map[string]interface{}))
//op := node["op"].(string)
return fmt.Sprintf("(%s %s )", lhs, rhs)

case "call":
funcName := node["func"].(map[string]interface{})["name"].(string)
args := node["args"].([]interface{})
argStrs := []string{}
for _, arg := range args {
argStrs = append(argStrs, assemblePromQL(arg.(map[string]interface{})))
}
return fmt.Sprintf("%s(%s)", funcName, strings.Join(argStrs, ", "))

case "matrixSelector":
name := node["name"].(string)
matchers := node["matchers"].([]interface{})
matcherStrs := []string{}
for _, matcher := range matchers {
m := matcher.(map[string]interface{})
matcherStrs = append(matcherStrs, fmt.Sprintf(`%s%s"%s"`, m["name"], m["type"], m["value"]))
}
rangeStr := fmt.Sprintf("[%dms]", int64(node["range"].(float64)))
return fmt.Sprintf("%s{%s}%s", name, strings.Join(matcherStrs, ", "), rangeStr)

//case "numberLiteral":
// return node["val"].(string)

case "parenExpr":
expr := assemblePromQL(node["expr"].(map[string]interface{}))
return fmt.Sprintf("(%s)", expr)

case "stringLiteral":
return fmt.Sprintf(`"%s"`, node["val"].(string))

case "unaryExpr":
expr := assemblePromQL(node["expr"].(map[string]interface{}))
op := node["op"].(string)
return fmt.Sprintf("%s%s", op, expr)

case "vectorSelector":
name := node["name"].(string)
matchers := node["matchers"].([]interface{})
matcherStrs := []string{}
for _, matcher := range matchers {
m := matcher.(map[string]interface{})
matcherStrs = append(matcherStrs, fmt.Sprintf(`%s%s"%s"`, m["name"], m["type"], m["value"]))
}
return fmt.Sprintf("%s{%s}", name, strings.Join(matcherStrs, ", "))
}

return ""
}

效果展示

func main() {
expr := `(sum by (cluster) (rate(kube_state_metrics_list_total{job="kube-state-metrics",result="error"}[5m])) / sum by (cluster) (rate(kube_state_metrics_list_total{job="kube-state-metrics"}[5m]))) > 0.01`
// 使用 promql parser 解析表达式
parsedExpr, err := parser.ParseExpr(expr)
if err != nil {
log.Fatalf("Error parsing expression: %v", err)
}

aaa := translateAST(parsedExpr)
jsonData, _ := json.MarshalIndent(aaa, "", " ")
fmt.Println(string(jsonData))
var jsonDataMap map[string]interface{}
err = json.Unmarshal(jsonData, &jsonDataMap)
if err != nil {
log.Fatalf("Error parsing JSON: %v", err)
}
promQL := assemblePromQL(jsonDataMap)
fmt.Println(promQL)
)

这里我们用一个稍微复杂的promQL进行测试

AST

image-20241122172251086

PromQL

(((sum by (cluster)(rate(kube_state_metrics_list_total{__addLabels__="www_kubesre_com", job="kube-state-metrics", result="error", __name__="kube_state_metrics_list_total"}[300000ms])) sum by (cluster)(rate(kube_state_metrics_list_total{__addLa
bels__="www_kubesre_com", job="kube-state-metrics", __name__="kube_state_metrics_list_total"}[300000ms])) )) )

总结

到此我们的本次分享就结束了,希望对大家有所帮助,当然也可以一起交流。至于如何取获取告警恢复当前值相信大家已经有思路了。

添加👇下面微信,拉你进群与大佬一起探讨云原生!


云原生运维圈
专注于Docker、Kubernetes、Prometheus、Istio、Terraform、OpenTelemetry等云原生技术分享!
 最新文章