背景概述
上篇内容我们分享了,如何在仪表盘上使用up命令显示主机名,当然我们借用的是node_export的一个指标实现的,所以当我们的up为0是,也就意味着我们的export也就不能用了,所以是不会显示主机名称的哦。今天有借着这个机会看了一下node_export的代码,顺便也记录一下如何实现一个自定义指标收集器。
实现思路
实现指标收集大致逻辑 自定义指标 验证
指标收集大致逻辑
node_export.go
代码入口
func main() {
var (
metricsPath = kingpin.Flag(
"web.telemetry-path",
"Path under which to expose metrics.",
).Default("/metrics").String()
disableExporterMetrics = kingpin.Flag(
"web.disable-exporter-metrics",
"Exclude metrics about the exporter itself (promhttp_*, process_*, go_*).",
).Bool()
maxRequests = kingpin.Flag(
"web.max-requests",
"Maximum number of parallel scrape requests. Use 0 to disable.",
).Default("40").Int()
disableDefaultCollectors = kingpin.Flag(
"collector.disable-defaults",
"Set all collectors to disabled by default.",
).Default("false").Bool()
maxProcs = kingpin.Flag(
"runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)",
).Envar("GOMAXPROCS").Default("1").Int()
toolkitFlags = kingpinflag.AddFlags(kingpin.CommandLine, ":9100")
)
promlogConfig := &promlog.Config{}
flag.AddFlags(kingpin.CommandLine, promlogConfig)
kingpin.Version(version.Print("node_exporter"))
kingpin.CommandLine.UsageWriter(os.Stdout)
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)
if *disableDefaultCollectors {
collector.DisableDefaultCollectors()
}
level.Info(logger).Log("msg", "Starting node_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext())
if user, err := user.Current(); err == nil && user.Uid == "0" {
level.Warn(logger).Log("msg", "Node Exporter is running as root user. This exporter is designed to run as unprivileged user, root is not required.")
}
runtime.GOMAXPROCS(*maxProcs)
level.Debug(logger).Log("msg", "Go MAXPROCS", "procs", runtime.GOMAXPROCS(0))
http.Handle(*metricsPath, newHandler(!*disableExporterMetrics, *maxRequests, logger))
if *metricsPath != "/" {
landingConfig := web.LandingConfig{
Name: "Node Exporter",
Description: "Prometheus Node Exporter",
Version: version.Info(),
Links: []web.LandingLinks{
{
Address: *metricsPath,
Text: "Metrics",
},
},
}
landingPage, err := web.NewLandingPage(landingConfig)
if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
http.Handle("/", landingPage)
}
server := &http.Server{}
if err := web.ListenAndServe(server, toolkitFlags, logger); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
}
❝这里我们主要是看一下这个Handle
newHandler
func newHandler(includeExporterMetrics bool, maxRequests int, logger log.Logger) *handler {
h := &handler{
exporterMetricsRegistry: prometheus.NewRegistry(),
includeExporterMetrics: includeExporterMetrics,
maxRequests: maxRequests,
logger: logger,
}
if h.includeExporterMetrics {
h.exporterMetricsRegistry.MustRegister(
promcollectors.NewProcessCollector(promcollectors.ProcessCollectorOpts{}),
promcollectors.NewGoCollector(),
)
}
if innerHandler, err := h.innerHandler(); err != nil {
panic(fmt.Sprintf("Couldn't create metrics handler: %s", err))
} else {
h.unfilteredHandler = innerHandler
}
return h
}
❝这里主要是自身指标已经内部收集器,我们可以理解为内存、磁盘等等的指标
innerHandler
func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
nc, err := collector.NewNodeCollector(h.logger, filters...)
if err != nil {
return nil, fmt.Errorf("couldn't create collector: %s", err)
}
// Only log the creation of an unfiltered handler, which should happen
// only once upon startup.
if len(filters) == 0 {
level.Info(h.logger).Log("msg", "Enabled collectors")
collectors := []string{}
for n := range nc.Collectors {
collectors = append(collectors, n)
}
sort.Strings(collectors)
for _, c := range collectors {
level.Info(h.logger).Log("collector", c)
}
}
r := prometheus.NewRegistry()
r.MustRegister(versioncollector.NewCollector("node_exporter"))
if err := r.Register(nc); err != nil {
return nil, fmt.Errorf("couldn't register node collector: %s", err)
}
var handler http.Handler
if h.includeExporterMetrics {
handler = promhttp.HandlerFor(
prometheus.Gatherers{h.exporterMetricsRegistry, r},
promhttp.HandlerOpts{
ErrorLog: stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0),
ErrorHandling: promhttp.ContinueOnError,
MaxRequestsInFlight: h.maxRequests,
Registry: h.exporterMetricsRegistry,
},
)
// Note that we have to use h.exporterMetricsRegistry here to
// use the same promhttp metrics for all expositions.
handler = promhttp.InstrumentMetricHandler(
h.exporterMetricsRegistry, handler,
)
} else {
handler = promhttp.HandlerFor(
r,
promhttp.HandlerOpts{
ErrorLog: stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0),
ErrorHandling: promhttp.ContinueOnError,
MaxRequestsInFlight: h.maxRequests,
},
)
}
return handler, nil
}
❝这里核心是nc,我们接着看一下nc
NewNodeCollector
func NewNodeCollector(logger log.Logger, filters ...string) (*NodeCollector, error) {
f := make(map[string]bool)
for _, filter := range filters {
enabled, exist := collectorState[filter]
if !exist {
return nil, fmt.Errorf("missing collector: %s", filter)
}
if !*enabled {
return nil, fmt.Errorf("disabled collector: %s", filter)
}
f[filter] = true
}
collectors := make(map[string]Collector)
initiatedCollectorsMtx.Lock()
defer initiatedCollectorsMtx.Unlock()
for key, enabled := range collectorState {
if !*enabled || (len(f) > 0 && !f[key]) {
continue
}
if collector, ok := initiatedCollectors[key]; ok {
collectors[key] = collector
} else {
collector, err := factories[key](log.With(logger, "collector", key))
if err != nil {
return nil, err
}
collectors[key] = collector
initiatedCollectors[key] = collector
}
}
return &NodeCollector{Collectors: collectors, logger: logger}, nil
}
❝获取NodeCollector并返回
collectorState
func registerCollector(collector string, isDefaultEnabled bool, factory func(logger log.Logger) (Collector, error)) {
var helpDefaultState string
if isDefaultEnabled {
helpDefaultState = "enabled"
} else {
helpDefaultState = "disabled"
}
flagName := fmt.Sprintf("collector.%s", collector)
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", collector, helpDefaultState)
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)
flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(collector)).Bool()
collectorState[collector] = flag
factories[collector] = factory
}
❝这里就是我们指标使用的一个注册函数。
其他的流程我们就不分析了,注册进去后,还有调用update方法。
自定义指标
我们这次定义一个登录用户的监控指标,也就是我们执行who命令后的返回,我们没有该命令可以使用last -f /var/run/utmp
,这里就不做过多的解释了。
collector/loginuser.go
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build (darwin || linux || openbsd || netbsd) && !nologinuser
// +build darwin linux openbsd netbsd
// +build !nologinuser
package collector
import (
"bufio"
"fmt"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"os/exec"
"strings"
"time"
)
const (
loginUserSubsystem = "loginuser"
)
type loginUserCollector struct {
logger log.Logger
}
func init() {
registerCollector("loginuser", defaultEnabled, NewLoginUserCollector)
}
func NewLoginUserCollector(logger log.Logger) (Collector, error) {
return &loginUserCollector{logger}, nil
}
func (l *loginUserCollector) Update(ch chan<- prometheus.Metric) error {
var metricType = prometheus.GaugeValue
scanner := bufio.NewScanner(strings.NewReader(l.getLoginUser()))
for scanner.Scan() {
line := scanner.Text()
fields := strings.Fields(line)
if len(fields) == 10 {
user := fields[0]
terminal := fields[1]
ipAddress := fields[2]
dateStr := fmt.Sprintf("%s %s %s %s", fields[3], fields[4], fields[5], fields[6])
parsedTime, _ := time.Parse("Mon Jan 2 15:04", dateStr)
// 转换为当前年份
currentYear := time.Now().Year()
parsedTime = parsedTime.AddDate(currentYear-parsedTime.Year(), 0, 0)
// 转换为 Unix 时间戳
unixTimestamp := parsedTime.Unix()
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, loginUserSubsystem, "info"),
"This is login user info",
[]string{"user", "ip", "tty"}, nil),
metricType, float64(unixTimestamp), user, ipAddress, terminal,
)
}
}
return nil
}
func (l *loginUserCollector) getLoginUser() string {
getUserCMD := `last -f /var/run/utmp | head -n -1`
cmd := exec.Command("sh", "-c", getUserCMD)
output, _ := cmd.CombinedOutput()
return string(output)
}
❝这里我们使用调用系统命令的方式,执行
last -f /var/run/utmp | head -n -1
,然后将登录时间的unix时间戳作为了value。
验证
我仅在centos7系统上进行的验证哦,如果你是其他系统,可以根据代码自行修改一下。
[root@192 inconsistent_metrics]# who
root tty1 2024-07-02 17:41
root pts/0 2024-07-02 17:41 (192.168.10.70)
root pts/1 2024-07-03 18:39 (192.168.10.70)
root pts/3 2024-07-04 15:29 (192.168.10.70)
[root@192 inconsistent_metrics]# last -f /var/run/utmp
root pts/3 192.168.10.70 Thu Jul 4 15:29 still logged in
root pts/1 192.168.10.70 Wed Jul 3 18:39 still logged in
root pts/0 192.168.10.70 Tue Jul 2 17:41 still logged in
root tty1 Tue Jul 2 17:41 still logged in
reboot system boot 3.10.0-1160.114. Tue Jul 2 17:41 - 23:09 (2+05:27)
总结
有时一个正确的选择往往会对我们的学习提供不可估量的帮助。我们不是一个人在努力而是一群志同道合的人一起努力。
以上内容与问题来自于知识星球,当然如果你想要加入星球可以扫描下方二维码,备注星球即可。