监控是运行可靠软件的重要组成部分,但许多团队在用户投诉开始出现后才会发现中断。 假设您在下午2点收到Slack消息,告诉您您的API已经停用了一个多小时,没有人注意到,直到客户开始投诉。 在本教程中,我将带你通过如何从头开始构建状态监控应用程序的步骤。 根据时间表测试您的服务(HTTP,TCP,DNS等) 检测中断并向各种通信渠道发送警报(团队,Slack等) 跟踪自动打开/关闭的事件 展示 Prometheus 和 Grafana 仪表板的指标 在Docker中运行 对于这个应用程序,我会使用Go,因为它是快速的,编译到单个二进制为跨平台支持,并处理同步,这对于需要同时监控多个终端的应用程序来说很重要。 我们正在建设什么 我们将构建一个Go应用程序“StatusD”。它读取了一个配置文件,它有一个服务列表来监控,调查它们,并创建事件,消防通知,当事情发生错误时。 Tech Stack Used: 戈兰 邮局 格拉法纳(Prometheus for metric) 多克 Nginx 的 这里是高层建筑: ┌─────────────────────────────────────────────────────────────────┐ │ Docker Compose │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │ │ Postgres │ │Prometheus│ │ Grafana │ │ Nginx │ │ │ │ DB │ │ (metrics)│ │(dashboard)│ │ (reverse proxy) │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────────┬─────────┘ │ │ │ │ │ │ │ │ └─────────────┴─────────────┴──────────────────┘ │ │ │ │ │ ┌─────────┴─────────┐ │ │ │ StatusD │ │ │ │ (our Go app) │ │ │ └─────────┬─────────┘ │ │ │ │ └──────────────────────────────┼──────────────────────────────────┘ │ ┌────────────────┼────────────────┐ ▼ ▼ ▼ ┌────────┐ ┌────────┐ ┌────────┐ │Service │ │Service │ │Service │ │ A │ │ B │ │ C │ └────────┘ └────────┘ └────────┘ 项目结构 在我们编写代码之前,让我们了解这些部件是如何相匹配的。 status-monitor/ ├── cmd/statusd/ │ └── main.go # Application entry point ├── internal/ │ ├── models/ │ │ └── models.go # Data structures (Asset, Incident, etc.) │ ├── probe/ │ │ ├── probe.go # Probe registry │ │ └── http.go # HTTP probe implementation │ ├── scheduler/ │ │ └── scheduler.go # Worker pool and scheduling │ ├── alert/ │ │ └── engine.go # State machine and notifications │ ├── notifier/ │ │ └── teams.go # Teams/Slack integration │ ├── store/ │ │ └── postgres.go # Database layer │ ├── api/ │ │ └── handlers.go # REST API │ └── config/ │ └── manifest.go # Config loading ├── config/ │ ├── manifest.json # Services to monitor │ └── notifiers.json # Notification channels ├── migrations/ │ └── 001_init_schema.up.sql ├── docker-compose.yml ├── Dockerfile └── entrypoint.sh 核心数据模型 在这里,我们将定义我们的“类型”,这基本上意味着我们将定义一个“监控服务”的样子。 我们将定义四种“类型”: 资产:这是我们想要监控的服务。 ProbeResult:当我们检查资产时会发生什么;响应,延迟等。 事件:当某事发生故障时,即当 ProbeResult 返回意外响应时(以及当服务恢复时)进行跟踪。 通知:这是向定义的通信渠道发送的警报或消息,例如团队、Slack、电子邮件等。 让我们在代码中定义类型: // internal/models/models.go package models import "time" // Asset represents a monitored service type Asset struct { ID string `json:"id"` AssetType string `json:"assetType"` // http, tcp, dns, etc. Name string `json:"name"` Address string `json:"address"` IntervalSeconds int `json:"intervalSeconds"` TimeoutSeconds int `json:"timeoutSeconds"` ExpectedStatusCodes []int `json:"expectedStatusCodes,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` } // ProbeResult contains the outcome of a single health check type ProbeResult struct { AssetID string Timestamp time.Time Success bool LatencyMs int64 Code int // HTTP status code Message string // Error message if failed } // Incident tracks a service outage type Incident struct { ID string AssetID string StartedAt time.Time EndedAt *time.Time // nil if still open Severity string Summary string } // Notification is what we send to Slack/Teams type Notification struct { AssetID string AssetName string Event string // "DOWN", "RECOVERY", "UP" Timestamp time.Time Details string } 注意到 不是所有的端点返回200,有些可以返回204或重定向,这允许您定义“健康”对每个服务意味着什么。 ExpectedStatusCodes 数据库计划 我们需要一个地方来存储探测结果和事件. 我们将使用PostgreSQL为此,这里是我们的方案: -- migrations/001_init_schema.up.sql CREATE TABLE IF NOT EXISTS assets ( id TEXT PRIMARY KEY, name TEXT NOT NULL, address TEXT NOT NULL, asset_type TEXT NOT NULL DEFAULT 'http', interval_seconds INTEGER DEFAULT 300, timeout_seconds INTEGER DEFAULT 5, expected_status_codes TEXT, metadata JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS probe_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), asset_id TEXT NOT NULL REFERENCES assets(id), timestamp TIMESTAMP WITH TIME ZONE NOT NULL, success BOOLEAN NOT NULL, latency_ms BIGINT NOT NULL, code INTEGER, message TEXT ); CREATE TABLE IF NOT EXISTS incidents ( id SERIAL PRIMARY KEY, asset_id TEXT NOT NULL REFERENCES assets(id), severity TEXT DEFAULT 'INITIAL', summary TEXT, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ended_at TIMESTAMP ); -- Indexes for common queries CREATE INDEX IF NOT EXISTS idx_probe_events_asset_id_timestamp ON probe_events(asset_id, timestamp DESC); CREATE INDEX IF NOT EXISTS idx_incidents_asset_id ON incidents(asset_id); CREATE INDEX IF NOT EXISTS idx_incidents_ended_at ON incidents(ended_at); 关键的洞察力在于 在这里,我们按资产和时间标签进行索引(以下降顺序),这使我们能够快速查询服务的调查结果。 probe_events(asset_id, timestamp DESC) 构建试验系统 我们希望支持对多个协议类型进行调查:HTTPS、TCP、DNS等,而无需编写复杂的交换声明。 首先,我们将定义一个探测器的样子: // internal/probe/probe.go package probe import ( "context" "fmt" "github.com/yourname/status/internal/models" ) // Probe defines the interface for checking service health type Probe interface { Probe(ctx context.Context, asset models.Asset) (models.ProbeResult, error) } // registry holds all probe types var registry = make(map[string]func() Probe) // Register adds a probe type to the registry func Register(assetType string, factory func() Probe) { registry[assetType] = factory } // GetProbe returns a probe for the given asset type func GetProbe(assetType string) (Probe, error) { factory, ok := registry[assetType] if !ok { return nil, fmt.Errorf("unknown asset type: %s", assetType) } return factory(), nil } 现在执行 HTTP 探测: // internal/probe/http.go package probe import ( "context" "io" "net/http" "time" "github.com/yourname/status/internal/models" ) func init() { Register("http", func() Probe { return &httpProbe{} }) } type httpProbe struct{} func (p *httpProbe) Probe(ctx context.Context, asset models.Asset) (models.ProbeResult, error) { result := models.ProbeResult{ AssetID: asset.ID, Timestamp: time.Now(), } client := &http.Client{ Timeout: time.Duration(asset.TimeoutSeconds) * time.Second, } req, err := http.NewRequestWithContext(ctx, http.MethodGet, asset.Address, nil) if err != nil { result.Success = false result.Message = err.Error() return result, err } start := time.Now() resp, err := client.Do(req) result.LatencyMs = time.Since(start).Milliseconds() if err != nil { result.Success = false result.Message = err.Error() return result, err } defer resp.Body.Close() // Read body (limit to 1MB) io.ReadAll(io.LimitReader(resp.Body, 1024*1024)) result.Code = resp.StatusCode // Check if status code is expected if len(asset.ExpectedStatusCodes) > 0 { for _, code := range asset.ExpectedStatusCodes { if code == resp.StatusCode { result.Success = true return result, nil } } result.Success = false result.Message = "unexpected status code" } else { result.Success = resp.StatusCode < 400 } return result, nil } init() 函数在您的 Go 应用程序启动时自动运行,从而将 HTTP 探测器添加到注册表中,而无需更改代码。 想添加 TCP 探测器? 创建 ,执行接口,并在 . tcp.go init() 时间表与竞争 我们需要按时间安排探测我们的所有资产,为此我们将使用工人池。 // internal/scheduler/scheduler.go package scheduler import ( "context" "sync" "time" "github.com/yourname/status/internal/models" "github.com/yourname/status/internal/probe" ) type JobHandler func(result models.ProbeResult) type Scheduler struct { workers int jobs chan models.Asset tickers map[string]*time.Ticker handler JobHandler mu sync.Mutex done chan struct{} wg sync.WaitGroup } func NewScheduler(workerCount int, handler JobHandler) *Scheduler { return &Scheduler{ workers: workerCount, jobs: make(chan models.Asset, 100), tickers: make(map[string]*time.Ticker), handler: handler, done: make(chan struct{}), } } func (s *Scheduler) Start(ctx context.Context) { for i := 0; i < s.workers; i++ { s.wg.Add(1) go s.worker(ctx) } } func (s *Scheduler) ScheduleAssets(assets []models.Asset) error { s.mu.Lock() defer s.mu.Unlock() for _, asset := range assets { interval := time.Duration(asset.IntervalSeconds) * time.Second ticker := time.NewTicker(interval) s.tickers[asset.ID] = ticker s.wg.Add(1) go s.scheduleAsset(asset, ticker) } return nil } func (s *Scheduler) scheduleAsset(asset models.Asset, ticker *time.Ticker) { defer s.wg.Done() for { select { case <-s.done: ticker.Stop() return case <-ticker.C: s.jobs <- asset } } } func (s *Scheduler) worker(ctx context.Context) { defer s.wg.Done() for { select { case <-s.done: return case asset := <-s.jobs: p, err := probe.GetProbe(asset.AssetType) if err != nil { continue } result, _ := p.Probe(ctx, asset) s.handler(result) } } } func (s *Scheduler) Stop() { close(s.done) close(s.jobs) s.wg.Wait() } 每个资产都有自己的计时器,只有时间表才能工作. 当时间检查一个资产时,计时器会将一个探测任务发送到一个渠道中。 我们不会直接运行探测器,因为探测器可以在等待网络响应或时间关机时阻止探测器。 例如,如果有 4 名工人和 100 个资产,即使标记员同时开火,在任何时候都只运行 4 个探测器。 确保所有工人清洁关闭。 sync.WaitGroup 事件检测:国家机器 当一个探测器失败时,我们不会自动假定它失败,但如果它再次失败,我们会创建一个事件,当它恢复时,我们会关闭事件并通知。 这是一个状态机器:上 → 下 → 上。 让发动机建成: // internal/alert/engine.go package alert import ( "context" "fmt" "sync" "time" "github.com/yourname/status/internal/models" "github.com/yourname/status/internal/store" ) type NotifierFunc func(ctx context.Context, notification models.Notification) error type AssetState struct { IsUp bool LastProbeTime time.Time OpenIncidentID string } type Engine struct { store store.Store notifiers map[string]NotifierFunc mu sync.RWMutex assetState map[string]AssetState } func NewEngine(store store.Store) *Engine { return &Engine{ store: store, notifiers: make(map[string]NotifierFunc), assetState: make(map[string]AssetState), } } func (e *Engine) RegisterNotifier(name string, fn NotifierFunc) { e.mu.Lock() defer e.mu.Unlock() e.notifiers[name] = fn } func (e *Engine) Process(ctx context.Context, result models.ProbeResult, asset models.Asset) error { e.mu.Lock() defer e.mu.Unlock() state := e.assetState[result.AssetID] state.LastProbeTime = result.Timestamp // State hasn't changed? Nothing to do. if state.IsUp == result.Success { e.assetState[result.AssetID] = state return nil } // Save probe event if err := e.store.SaveProbeEvent(ctx, result); err != nil { return err } if result.Success && !state.IsUp { // Recovery! return e.handleRecovery(ctx, asset, state) } else if !result.Success && state.IsUp { // Outage! return e.handleOutage(ctx, asset, state, result) } return nil } func (e *Engine) handleOutage(ctx context.Context, asset models.Asset, state AssetState, result models.ProbeResult) error { incidentID, err := e.store.CreateIncident(ctx, asset.ID, fmt.Sprintf("Service %s is down", asset.Name)) if err != nil { return err } state.IsUp = false state.OpenIncidentID = incidentID e.assetState[asset.ID] = state notification := models.Notification{ AssetID: asset.ID, AssetName: asset.Name, Event: "DOWN", Timestamp: result.Timestamp, Details: result.Message, } return e.sendNotifications(ctx, notification) } func (e *Engine) handleRecovery(ctx context.Context, asset models.Asset, state AssetState) error { if state.OpenIncidentID != "" { e.store.CloseIncident(ctx, state.OpenIncidentID) } state.IsUp = true state.OpenIncidentID = "" e.assetState[asset.ID] = state notification := models.Notification{ AssetID: asset.ID, AssetName: asset.Name, Event: "RECOVERY", Timestamp: time.Now(), Details: "Service has recovered", } return e.sendNotifications(ctx, notification) } func (e *Engine) sendNotifications(ctx context.Context, notification models.Notification) error { for name, notifier := range e.notifiers { if err := notifier(ctx, notification); err != nil { fmt.Printf("notifier %s failed: %v\n", name, err) } } return nil } 关键见解:我们追踪记忆中的状态 快速搜索,但持续事件到数据库的可持续性. 如果过程重新启动,我们可以从开放事件重建状态。 assetState 发送通知 如有故障,人们需要知道,我们需要向各种通讯渠道发送通知。 让我们定义我们的团队通知器: // internal/notifier/teams.go package notifier import ( "bytes" "context" "encoding/json" "fmt" "net/http" "time" "github.com/yourname/status/internal/models" ) type TeamsNotifier struct { webhookURL string client *http.Client } func NewTeamsNotifier(webhookURL string) *TeamsNotifier { return &TeamsNotifier{ webhookURL: webhookURL, client: &http.Client{Timeout: 10 * time.Second}, } } func (t *TeamsNotifier) Notify(ctx context.Context, n models.Notification) error { emoji := "🟢" if n.Event == "DOWN" { emoji = "🔴" } card := map[string]interface{}{ "type": "message", "attachments": []map[string]interface{}{ { "contentType": "application/vnd.microsoft.card.adaptive", "content": map[string]interface{}{ "$schema": "http://adaptivecards.io/schemas/adaptive-card.json", "type": "AdaptiveCard", "version": "1.4", "body": []map[string]interface{}{ { "type": "TextBlock", "text": fmt.Sprintf("%s %s - %s", emoji, n.AssetName, n.Event), "weight": "Bolder", "size": "Large", }, { "type": "FactSet", "facts": []map[string]interface{}{ {"title": "Service", "value": n.AssetName}, {"title": "Status", "value": n.Event}, {"title": "Time", "value": n.Timestamp.Format(time.RFC1123)}, {"title": "Details", "value": n.Details}, }, }, }, }, }, }, } body, _ := json.Marshal(card) req, _ := http.NewRequestWithContext(ctx, "POST", t.webhookURL, bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") resp, err := t.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("Teams webhook returned %d", resp.StatusCode) } return nil } Teams 使用 Adaptive Cards 来丰富格式化,您可以为其他通信渠道定义各种通知器,例如 Slack、Discord 等。 剩下的火焰 我们需要终端来查询我们正在监控的服务的状态. 为此,我们将使用Chi,这是一个支持路线参数的轻量级路由器 . /assets/{id} 让我们来定义APIS: // internal/api/handlers.go package api import ( "encoding/json" "net/http" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/yourname/status/internal/store" ) type Server struct { store store.Store mux *chi.Mux } func NewServer(s store.Store) *Server { srv := &Server{store: s, mux: chi.NewRouter()} srv.mux.Use(middleware.Logger) srv.mux.Use(middleware.Recoverer) srv.mux.Route("/api", func(r chi.Router) { r.Get("/health", srv.health) r.Get("/assets", srv.listAssets) r.Get("/assets/{id}/events", srv.getAssetEvents) r.Get("/incidents", srv.listIncidents) }) return srv } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) } func (s *Server) health(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]string{"status": "healthy"}) } func (s *Server) listAssets(w http.ResponseWriter, r *http.Request) { assets, err := s.store.GetAssets(r.Context()) if err != nil { http.Error(w, err.Error(), 500) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(assets) } func (s *Server) getAssetEvents(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") events, _ := s.store.GetProbeEvents(r.Context(), id, 100) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(events) } func (s *Server) listIncidents(w http.ResponseWriter, r *http.Request) { incidents, _ := s.store.GetOpenIncidents(r.Context()) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(incidents) } 上面的代码定义了一个小型HTTP API服务器,它暴露了4个只读的终端: GET /api/health - 健康检查(服务运行吗?) GET /api/assets - 列出所有监控服务 GET /api/assets/{id}/events - 获取特定服务的探测历史 GET /api/incidents - 开放的事件列表 Docker化应用程序 Dockerizing 应用程序是相当直接的,因为Go 编译到一个单一的二进制. 我们将使用多阶段的构建来保持最终图像小: # Dockerfile FROM golang:1.24-alpine AS builder WORKDIR /app RUN apk add --no-cache git COPY go.mod go.sum ./ RUN go mod download COPY . . RUN CGO_ENABLED=0 GOOS=linux go build -o statusd ./cmd/statusd/ FROM alpine:latest WORKDIR /app RUN apk --no-cache add ca-certificates COPY --from=builder /app/statusd . COPY entrypoint.sh . RUN chmod +x /app/entrypoint.sh EXPOSE 8080 ENTRYPOINT ["/app/entrypoint.sh"] 最后的阶段只是Alpine加上我们的二进制 - 通常低于20MB。 输入点脚本从环境变量中构建数据库连接字符串: #!/bin/sh # entrypoint.sh DB_HOST=${DB_HOST:-localhost} DB_PORT=${DB_PORT:-5432} DB_USER=${DB_USER:-status} DB_PASSWORD=${DB_PASSWORD:-status} DB_NAME=${DB_NAME:-status_db} DB_CONN_STRING="postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}" exec ./statusd \ -manifest /app/config/manifest.json \ -notifiers /app/config/notifiers.json \ -db "$DB_CONN_STRING" \ -workers 4 \ -api-port 8080 《Docker Compose:把一切都放在一起》 一个文件来统治他们所有: # docker-compose.yml version: "3.8" services: postgres: image: postgres:15-alpine container_name: status_postgres environment: POSTGRES_USER: status POSTGRES_PASSWORD: changeme POSTGRES_DB: status_db volumes: - postgres_data:/var/lib/postgresql/data - ./migrations:/docker-entrypoint-initdb.d healthcheck: test: ["CMD-SHELL", "pg_isready -U status"] interval: 10s timeout: 5s retries: 5 networks: - status_network statusd: build: . container_name: status_app environment: - DB_HOST=postgres - DB_PORT=5432 - DB_USER=status - DB_PASSWORD=changeme - DB_NAME=status_db volumes: - ./config:/app/config:ro depends_on: postgres: condition: service_healthy networks: - status_network prometheus: image: prom/prometheus:latest container_name: status_prometheus volumes: - ./docker/prometheus.yml:/etc/prometheus/prometheus.yml - prometheus_data:/prometheus networks: - status_network depends_on: - statusd grafana: image: grafana/grafana:latest container_name: status_grafana environment: GF_SECURITY_ADMIN_USER: admin GF_SECURITY_ADMIN_PASSWORD: admin volumes: - grafana_data:/var/lib/grafana networks: - status_network depends_on: - prometheus nginx: image: nginx:alpine container_name: status_nginx volumes: - ./docker/nginx/nginx.conf:/etc/nginx/nginx.conf:ro - ./docker/nginx/conf.d:/etc/nginx/conf.d:ro ports: - "80:80" depends_on: - statusd - grafana - prometheus networks: - status_network networks: status_network: driver: bridge volumes: postgres_data: prometheus_data: grafana_data: 几个要注意的事情: PostgreSQL 健康檢查: statusd 服務等待 Postgres 實際準備好,而不是剛剛開始。 Config mount: We mount ./config as read-only. 局部编辑您的宣言,运行容器会看到更改。 Nginx:将外部流量路由到Grafana和Prometheus仪表板。 配置文件 应用程序读取两个文件: 和 manifest.json notifiers.json The file lists the assets we want to monitor. Each asset needs an ID, a probe type, and an address. The controls how often we check (60 = once per minute). lets you define what "healthy" means. Some endpoints return 301 redirects or 204 No Content, and that's fine. manifest.json intervalSeconds expectedStatusCodes // config/manifest.json { "assets": [ { "id": "api-prod", "assetType": "http", "name": "Production API", "address": "https://api.example.com/health", "intervalSeconds": 60, "timeoutSeconds": 5, "expectedStatusCodes": [200], "metadata": { "env": "prod", "owner": "platform-team" } }, { "id": "web-prod", "assetType": "http", "name": "Production Website", "address": "https://www.example.com", "intervalSeconds": 120, "timeoutSeconds": 10, "expectedStatusCodes": [200, 301] } ] } The controls where to send alerts. You define notification channels (Teams, Slack), then set policies for which channels fire on which events. means you won't get spammed more than once every 5 minutes for the same issue. notifiers.json throttleSeconds: 300 // config/notifiers.json { "notifiers": { "teams": { "type": "teams", "webhookUrl": "https://outlook.office.com/webhook/your-webhook-url" } }, "notificationPolicy": { "onDown": ["teams"], "onRecovery": ["teams"], "throttleSeconds": 300, "repeatAlerts": false } } 跑它 docker-compose up -d 那就是它了!五个服务旋转起来: PostgreSQL 存储您的数据 StatusD 测试您的服务 普罗米修斯收集指数 图书馆 图书馆 图书馆 图书馆 (http://localhost:80) Nginx 路由一切 查看日志: docker logs -f status_app 你应该看到: Loading assets manifest... Loaded 2 assets Loading notifiers config... Loaded 1 notifiers Connecting to database... Starting scheduler... [✓] Production API (api-prod): 45ms [✓] Production Website (web-prod): 120ms 摘要 现在你有一个监控系统: 从 JSON 配置读取服务 在使用工人池的日程表上尝试它们 检测中断并创建事件 向 Teams/Slack 发送通知 展出 Prometheus 指数 在Docker中运行一个命令 本教程将帮助您部署一个工作监控系统. 然而,我们所描述的帽子下面还有更多。 电路中断器防止在服务发生故障时发生滑动故障 多层升级警报经理如果工程师在呼叫中不响应 警报重复防止通知风暴 适应性探测间隔在事件中更频繁地检查 热重新加载配置而无需重新启动服务 SLA计算和合规追踪