百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

基于Prometheus的自动化巡检(prometheus自动发现详解)

ccwgpt 2025-07-09 11:14 3 浏览 0 评论



!! 大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。


作者:乔克
公众号:运维开发故事



道路千万条,安全第一条。操作不规范,运维两行泪。

前言

目前,大部分公司都采用Prometheus + Grafana这一套来做指标监控,所以在Prometheus中也有大量的指标数据。为了满足日常工作中的巡检,可以基于Prometheus实现自动巡检,减轻部分运维压力。

思路

为了灵活管理巡检任务,将整个巡检功能进行了拆分管理,分为:

  • 数据源管理:可以管理多个Prometheus数据源,后期也可以增加其他数据源,比如ES等。
  • 巡检项管理:目前的巡检项就是各种Prometheus规则,之所以要单独进行管理,是为了在多数据源、多集群等情况下进行复用。
  • 标签管理:目前是Prometheuslabel,也是为了方便复用巡检项巡检项标签可以灵活进行组合。
  • 任务编排:编排各种巡检任务。
  • 执行作业:配置定时的巡检作业,它由多个编排的任务组成。
  • 巡检报告:便于查看、导出巡检结果。
  • 巡检通知:巡检结果可以通知到企业微信群,便于业务方快速知道目前整个系统有没有问题。

效果

数据源管理

(1)添加数据源

(2)数据源列表

巡检项管理

(1)添加巡检项

(2)巡检项列表

标签管理

(1)添加标签

(2)标签列表

任务编排

(1)创建任务编排

(2)任务列表

执行作业

(1)创建执行作业

(2)作业列表

巡检报告

每次巡检完成都会生成对应的巡检报告。

点击详情可以看到巡检的具体结果。

点击导出,即可将报告导出为PDF。

如果配置了巡检通知,则会将对应的巡检结果发送到企业微信群。

代码实现

大部分的代码都是普通的CRUD,比如数据源的管理、巡检项的管理都是基础的CRUD,没有什么好说的。

这里简单说一下具体巡检的实现。

(1)当用户创建了执行作业且该作业处于开启状态,就会创建一个定时任务。

// CreateCronTask 创建定时任务

func (inspectionExecutionJobService *InspectionExecutionJobService) CreateCronTask(job *AutoInspection.InspectionExecutionJob) error {
cronName := fmt.Sprintf("InspectionExecution_%d", job.ID)
taskName := fmt.Sprintf("InspectionExecution_%d", job.ID)
// 检查是否已存在相同的定时任务
if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
// 如果已存在,先清除旧的定时任务
global.GVA_Timer.Clear(cronName)
}
// 创建定时任务
var option []cron.Option
option = append(option, cron.WithSeconds())
// 添加定时任务
if _, err := global.GVA_Timer.AddTaskByFunc(cronName, job.CronExpr, func() {
// 执行巡检任务
inspectionExecutionJobService.ExecuteInspectionJob(job)
}, taskName, option...); err != nil {
global.GVA_LOG.Error("创建定时任务失败", zap.Error(err), zap.Uint("jobID", job.ID))
return err
}
// 更新下次执行时间
nextTime := inspectionExecutionJobService.calculateNextRunTime(job.CronExpr)
job.NextRunTime = &nextTime
// 更新数据库中的记录
return global.GVA_DB.Model(job).Updates(map[string]interface{}{
"next_run_time": job.NextRunTime,
}).Error
}

Tips:因为是采用的gin-vue-admin框架,所以直接使用框架自带的timer定时器。

(2)当执行时间到了,就会执行ExecuteInspectionJob巡检任务。

func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionJob(job *AutoInspection.InspectionExecutionJob) {
// 更新作业执行时间
inspectionExecutionJobService.updateJobExecutionTime(job)
// 创建执行记录
jobExecution := inspectionExecutionJobService.createJobExecution(job)
if jobExecution == nil {
return
}
// 执行所有关联的巡检任务并收集结果
allResults := inspectionExecutionJobService.executeAllInspectionTasks(job, jobExecution)
global.GVA_LOG.Info("执行完成", zap.Any("results", allResults))
// 更新执行记录状态和结果
inspectionExecutionJobService.updateJobExecutionResult(jobExecution, allResults)
// 发送通知
if *job.IsNotice {
inspectionExecutionJobService.sendInspectionNotification(job, jobExecution, allResults)
}
}

这里主要是executeAllInspectionTasks来执行巡检任务。

// executeAllInspectionTasks 执行所有关联的巡检任务并收集结果

func (inspectionExecutionJobService *InspectionExecutionJobService) executeAllInspectionTasks(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution) []*result.ProductsResult {
// 创建一个等待组来同步所有巡检任务
var wg sync.WaitGroup
// 创建一个互斥锁来保护结果集
var mu sync.Mutex
// 创建一个结果集合
allResults := make([]*result.ProductsResult, 0)
// 执行所有关联的巡检任务
for _, jobID := range job.JobIds {
wg.Add(1)
gofunc(id uint) {
defer wg.Done()
// 执行单个巡检任务并获取结果
result := inspectionExecutionJobService.executeSingleInspectionTask(id, jobExecution)
if result != nil {
// 将结果添加到总结果集中
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()
}
}(jobID)
}
// 等待所有巡检任务完成
wg.Wait()
return allResults
}

它会把作业中的任务拆成单个任务,然后由executeSingleInspectionTask分别执行并收集执行结果。

// executeSingleInspectionTask 执行单个巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) executeSingleInspectionTask(jobID uint, jobExecution *AutoInspection.JobExecution) *result.ProductsResult {
global.GVA_LOG.Info("执行巡检任务", zap.Uint("jobID", jobID))
// 获取巡检任务信息
inspectionJob, _ := inspectionJobService.GetInspectionJob(fmt.Sprintf("%d", jobID))
// 创建结果通道
resultCh := make(chan *result.ProductsResult)
// 创建一个用于等待结果的WaitGroup
var resultWg sync.WaitGroup
resultWg.Add(1)
// 用于存储结果的变量
var taskResult *result.ProductsResult
// 启动一个goroutine来接收结果
gofunc() {
defer resultWg.Done()
result := <-resultCh
global.GVA_LOG.Info("巡检任务执行完成",
zap.String("jobName", inspectionJob.Name),
zap.Any("result", result))
// 保存结果
taskResult = result
}()
// 执行巡检任务
inspectionExecutionJobService.ExecuteInspectionTask(&inspectionJob, jobExecution, resultCh)
// 等待结果接收完成
resultWg.Wait()
return taskResult

}

ExecuteInspectionTask中是为了方便扩展数据源。

func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {

switch inspectionJob.DataSourceType {
case "prometheus":
// 执行Prometheus巡检任务
inspectionExecutionJobService.ExecutePrometheusInspectionTask(inspectionJob, jobExecution, resultCh)
}
}

由于目前只有Prometheus数据源,所以将直接执行ExecutePrometheusInspectionTask。在这个方法中主要是构造Prometheus规则然后进行巡检。

// ExecutePrometheusInspectionTask 执行Prometheus巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecutePrometheusInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
// 执行Prometheus巡检任务的逻辑
var inspectionItemsService InspectionItemsService
var inspectionTagService InspectionTagService
var dataSourceService DataSourceService

// 获取数据源信息
dataSource, _ := dataSourceService.GetDataSource(fmt.Sprintf("%d", inspectionJob.DataSourceId))

// 创建规则列表
prometheusRules := make([]*product.PrometheusRule, 0, len(inspectionJob.ItemLabelMaps))

// 遍历巡检项与标签映射关系
for _, itemLabelMap := range inspectionJob.ItemLabelMaps {
// 获取巡检项信息
inspectionItem, _ := inspectionItemsService.GetInspectionItems(fmt.Sprintf("%d", itemLabelMap.ItemId))

// 获取标签信息
var inspectionTag AutoInspection.InspectionTag
if itemLabelMap.LabelId != 0 {
inspectionTag, _ = inspectionTagService.GetInspectionTag(fmt.Sprintf("%d", itemLabelMap.LabelId))
}

// 创建Prometheus规则
prometheusRule := &product.PrometheusRule{
Name: inspectionItem.Name,
Rule: inspectionItem.Rule,
LabelFilter: inspectionTag.Label,
Desc: inspectionItem.Description,
AlertInfo: inspectionItem.OutputTemplate,
DataSourceName: dataSource.Name,
}

// 添加到规则列表
prometheusRules = append(prometheusRules, prometheusRule)

}

// 创建规则集合
rules := product.Rules{
Prometheus: prometheusRules,
AliyunSafe: []*product.AliyunSafeRule{}, // 空列表,因为这里只处理Prometheus规则
}

// 创建产品
prod := &product.Product{
Name: inspectionJob.Name,
Rules: rules,
}

// 使用defer和recover捕获可能的panic
deferfunc() {
if r := recover(); r != nil {
// 记录panic信息
global.GVA_LOG.Error("执行巡检任务发生panic",
zap.Any("panic", r),
zap.String("jobName", inspectionJob.Name))

// 创建一个表示失败的结果并发送到结果通道
pr := &result.ProductsResult{ProductName: inspectionJob.Name}

// 为每个规则创建失败结果
for _, rule := range prometheusRules {
errorMsg := fmt.Sprintf("巡检执行失败: %v", r)
failureResult := result.NewRuleResult(
result.WithInspectionInfo(rule.Name),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(
[]map[string]string{{
"error": errorMsg,
"rule": rule.Rule,
}},
"执行规则 {{rule}} 时发生错误: {{error}}",
),
)
pr.Add(failureResult)
}

// 发送结果
resultCh <- pr
}
}()

// 执行巡检
err = prod.Run(resultCh)
if err != nil {
global.GVA_LOG.Error("执行巡检任务失败", zap.Error(err), zap.String("jobName", inspectionJob.Name))
return
}

global.GVA_LOG.Info("巡检任务已启动", zap.String("jobName", inspectionJob.Name))
}

prod.Run中,会去做真正的指标数据查询。

func (p *Product) Run(resultCh chan *result.ProductsResult) error {
global.GVA_LOG.Info(fmt.Sprintf("开始巡检, %s", p.Name))
pr := &result.ProductsResult{ProductName: p.Name}
// prometheus巡检规则
for _, prometheusRule := range p.Rules.Prometheus {
ruleInspectRes, err := prometheusRule.Run()
if err != nil {
return err
}
pr.Add(ruleInspectRes)
}
resultCh <- pr
returnnil
}

然后调用prometheusRule.Run获取结果。

func (r *PrometheusRule) Run() (*result.RuleResult, error) {
ds, err := datasource.GetByName(r.DataSourceName)
if err != nil {
returnnil, err
}
pds, ok := ds.(*datasource.PrometheusDataSource)
if !ok {
returnnil, fmt.Errorf("数据源类型错误: %s 不是Prometheus数据源", r.DataSourceName)
}
if pds.Client == nil {
returnnil, fmt.Errorf("数据源为空: %s", r.DataSourceName)
}
res, err := pds.Run(r.Rule, r.LabelFilter)
if err != nil {
returnnil, err
}
ruleRes := r.buildRuleResult(res)
return ruleRes, nil
}

func (r *PrometheusRule) buildRuleResult(resultLabels []map[string]string) *result.RuleResult {
iflen(resultLabels) == 0 {
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.NORMAL))
}
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(resultLabels, r.AlertInfo))
}

具体的查询是封装在pds.Run中的,它会去调用Prometheus的接口去查询数据。

func Query(client api.Client, rule string) (model.Value, []string, error) {
// 添加空指针检查
if client == nil {
returnnil, nil, errors.New("Prometheus client is nil")
}
v1Api := promV1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
value, warnings, err := v1Api.Query(ctx, rule, time.Now(), promV1.WithTimeout(10*time.Second))
global.GVA_LOG.Debug("查询结果", zap.String("value", value.String()), zap.Any("warnings", warnings))
if err != nil {
returnnil, nil, errors.WithStack(err)
}
return value, warnings, nil
}

(3)如果需要发送到企业微信,就会构建发送结果进行发送。

func (inspectionExecutionJobService *InspectionExecutionJobService) sendInspectionNotification(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution, results []*result.ProductsResult) {
// 获取通知配置
var notifyService = NotifyService{}
notify, err := notifyService.GetNotify(fmt.Sprintf("%d", job.NoticdId))
if err != nil {
global.GVA_LOG.Error("获取通知配置失败", zap.Error(err))
return
}

// 构建通知内容
// 1. 巡检摘要
taskCount := len(results) // 巡检任务数量
itemCount := 0 // 巡检项数量
normalCount := 0 // 正常项数量
abnormalCount := 0 // 异常项数量
abnormalItems := []string{} // 异常项列表

// 统计巡检项、正常项和异常项的数量
for _, task := range results {
itemCount += len(task.SubRuleResults)
for _, item := range task.SubRuleResults {
if item.InspectionResult == result.NORMAL {
normalCount++
} elseif item.InspectionResult == result.ABNORMAL {
abnormalCount++
// 收集异常项信息
abnormalDetail := fmt.Sprintf("【%s】%s", task.ProductName, item.InspectionInfo)
iflen(item.InspectionErrorInfo) > 0 {
abnormalDetail += "\n" + strings.Join(item.InspectionErrorInfo, "\n")
}
abnormalItems = append(abnormalItems, abnormalDetail)
}
}
}

// 格式化摘要信息
summary := fmt.Sprintf("巡检任务%d个,巡检项%d个,正常%d个,异常%d个", taskCount, itemCount, normalCount, abnormalCount)

// 构建企业微信通知内容
var content string
if notify.TemplateType == "markdown" {
// Markdown格式
content = fmt.Sprintf(`{
"msgtype": "markdown",
"markdown": {
"content": "# 自动化巡检结果通知\n\n> ### 执行作业:%s\n> ### 执行时间:%s\n> ### 执行结果:%s\n\n### **异常项列表:**\n%s"
}
}`
,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItems(abnormalItems))
} else {
// 文本格式
content = fmt.Sprintf(`{
"msgtype": "text",
"text": {
"content": "巡检结果通知\n执行作业:%s\n执行时间:%s\n执行结果:%s\n\n异常项列表:\n%s"
}
}`
,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItemsText(abnormalItems))
}

// 发送通知
ctx := context.Background()
sendParams := sender.SendParams{
NoticeType: notify.Type,
NoticeId: fmt.Sprintf("%d", notify.ID),
NoticeName: notify.Name,
Hook: notify.Address,
Content: content,
}

err = sender.Sender(&ctx, sendParams)
if err != nil {
global.GVA_LOG.Error("发送巡检通知失败", zap.Error(err))
return
}

global.GVA_LOG.Info("发送巡检通知成功",
zap.String("jobName", jobExecution.ExecutionJobName),
zap.String("summary", summary))
}

(4)PDF导出是用wkhtmltopdf实现,该包依赖服务器上的wkhtmltopdf命令。

func (jobExecutionService *JobExecutionService) GeneratePDF(jobExecution *AutoInspection.JobExecution) (string, error) {

pdf, err := wkhtmltopdf.NewPDFGenerator()
if err != nil {
global.GVA_LOG.Error("PDF生成器初始化失败", zap.Error(err))
return"", err
}

// 设置全局选项
pdf.Dpi.Set(300)
pdf.Orientation.Set(wkhtmltopdf.OrientationPortrait)
pdf.PageSize.Set(wkhtmltopdf.PageSizeA4)
pdf.MarginTop.Set(20)
pdf.MarginBottom.Set(20)
pdf.MarginLeft.Set(20)
pdf.MarginRight.Set(20)

// 渲染HTML模板
htmlContent, err := jobExecutionService.renderHTMLTemplate(jobExecution)
if err != nil {
global.GVA_LOG.Error("HTML模板渲染失败", zap.Error(err))
return"", err
}

// 创建一个页面并添加到生成器
page := wkhtmltopdf.NewPageReader(bytes.NewBufferString(htmlContent))
pdf.AddPage(page)

// 生成PDF
err = pdf.Create()
if err != nil {
return"", err
}

basePath := "uploads/pdf"
// 创建目录(如果不存在)
if err = os.MkdirAll(basePath, 0755); err != nil {
global.GVA_LOG.Error("创建PDF保存目录失败", zap.Error(err))
return"", err
}

filename := generatePDFFileName(jobExecution)
filePath := filepath.Join(basePath, filename)

// 3. 保存PDF到文件
if err = os.WriteFile(filePath, pdf.Bytes(), 0644); err != nil {
global.GVA_LOG.Error("保存PDF文件失败", zap.Error(err))
return"", err
}

....

return downloadURL, nil
}

以上就是实现巡检的主要代码。

最后

大部分企业虽然都有监控告警,但是自动化巡检在日常的运维工作中还是必要的,它可以聚合目前系统、集群存在的问题,避免遗漏告警信息。另外,在AI发展迅猛的今天,可以把AI也结合到自动化巡检中,比如在巡检中增加一些AI预测,AI故障诊断、AI根因分析等功能。



最后,求关注。如果你还想看更多优质原创文章,欢迎关注我们的公众号「运维开发故事」。

如果我的文章对你有所帮助,还请帮忙一下,你的支持会激励我输出更高质量的文章,非常感谢!

你还可以把我的公众号设为「星标」,这样当公众号文章更新时,你会在第一时间收到推送消息,避免错过我的文章更新。






相关推荐

如何高效实现API接口的自动化测试?

实现API接口的自动化测试是一个多步骤的过程,涉及需求分析、测试用例设计、环境搭建、脚本编写、执行测试、结果分析和持续集成等多个环节。选择合适的工具和框架也是成功的关键。嘿,咱来聊聊实现API接口自动...

总结100+前端优质库,让你成为前端百事通

1年多时间,陆陆续续整理了一些常用且实用的开源项目,方便大家更高效地学习和工作.js相关库js常用工具类「lodash」一个一致性、模块化、高性能的JavaScript实用工具库。「xij...

混合开发到底怎么个混法?(混合开发rn)

引言最近几年混合开发越来越火,从PhoneGap到Cordova到Ionic,再到ReactNative,到Flutter。同时在搜索引擎中诸如IonicVSReactNativeRN和Weex+...

无所不能,将 Vue 渲染到嵌入式液晶屏

该文章转载自公众号@前端时刻,https://mp.weixin.qq.com/s/WDHW36zhfNFVFVv4jO2vrA前言之前看了雪碧大佬的将React渲染到嵌入式液晶屏觉得很有意思,R...

【直接收藏】前端 VUE 高阶面试题(一)

说说vue动态权限绑定渲染列表(权限列表渲染)首先请求服务器,获取当前用户的权限数据,比如请求this.$http.get("rights/list");获取到权限数据之后,在列表中...

Vue采用虚拟DOM的目的是什么?(vue2 虚拟dom)

虚拟DOM更新其实效率并不像大家想象中的那么高,而且React官方也从来没说过虚拟DOM效率有多高,相反React虚拟DOM的实现也不是所有虚拟DOM产品中最好的。但是通过虚拟D...

什么是 JavaScript?(什么是党的旗帜)

本文首发自「慕课网」,想了解更多IT干货内容,程序员圈内热闻,欢迎关注!作者|慕课网精英讲师然冬JavaScript(JS)是一种具有函数优先的轻量级,解释型或即时编译型的编程语言。(MDN...

Weex在内涵发现页中的工程实践(weex唯客交易所官网)

React-Native和Weex是目前最为火热的两个客户端跨平台解决方案。从去年2016年9月份开始,IES在抖音产品中应用了ReactNative,中途遇到了很多的问题,尤其是长列表的性能问题一...

新恒汇:公司主要业务包括智能卡业务、蚀刻引线框架业务以及物联网eSIM芯片封测业务

证券日报网讯新恒汇7月3日在互动平台回答投资者提问时表示,公司主要业务包括智能卡业务、蚀刻引线框架业务以及物联网eSIM芯片封测业务。具体请关注公司公告和公开披露信息。(编辑王雪儿)...

“移”科普——什么是物联网?(移动设备物联网物联网应用实例)

物联网(InternetofThings,简称IoT)是指通过互联网将物理世界与数字世界相连接,实现物与物之间的智能互联的网络。它是一种新型的信息通信技术,通过传感器、嵌入式系统、网络技术等手段,...

如何自己搭建一个物联网平台?(自建物联网云平台)

自己搭建一个物联网(IoT)平台需要涉及多个关键步骤,包括硬件设备的选择、软件开发、网络通信、安全性设计以及数据管理。以下是搭建物联网平台的基本流程:1.确定物联网平台架构一个完整的物联网平台通常包...

物联网数据接入篇-应用层 Modbus(5)

前四篇文章讲述的是TCP/IP模型中的网络接口层、网络层、传输层、应用层一,这里到了第四层应用层二。由于协议比较多,就分开篇来介绍。这篇讲Modbus协议,后面再讲MQTT协议、CoAP协议、...

乐鑫ESP32-C5全面量产:行业首款双频Wi-Fi 6的RISC-V SoC

IT之家5月2日消息,乐鑫信息科技4月30日宣布,ESP32-C5现已全面进入量产。ESP32-C5宣称是行业首款2.4&5GHz双频Wi-Fi6的RISC-...

Vue Shop Admin:强大而易用的后台管理系统模板

VueShopAdmin是一个基于Vue.js框架的后台管理系统模板。它具有简洁、易用和美观的特点,非常适合开发人员用于快速构建各种类型的管理系统。这个模板使用了最新的技术,如Vue3、V...

基于Prometheus的自动化巡检(prometheus自动发现详解)

!!大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。作者:乔克公众号:运维开发故事道路千万条,安全第一条。操作不规范,运维两行泪。前言目前,大部分公司都采用Promet...

取消回复欢迎 发表评论: