ci-workflows/scripts/infra-monitor.py
feibisi 929d23c26d
All checks were successful
Go 项目 CI / ci (push) Has been skipped
gitleaks 密钥泄露扫描 / gitleaks (push) Successful in -8h1m17s
TypeScript/JS 项目 CI / ci (push) Has been skipped
WordPress 插件 CI / ci (push) Has been skipped
fix: 修复 infra-monitor 误报(容器/SSL/域名)
- 空容器列表跳过 docker ps 检查(cravatar-prod/wptea-prod)
- cravatar.cn → cn.cravatar.com(直接检查目标,跳过 301)
- 移除已停用的 fonts.wptea.com(网站+SSL)
- 网站检查改用默认 SSL 上下文(CERT_NONE 导致部分服务器 TLS 握手失败)
- 品牌名修正:微小朵 → 薇晓朵

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-02-20 14:15:29 +08:00

494 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""基础设施监控脚本
检查项:
- Forgejo 服务健康
- Forgejo Runner 状态
- VPS 磁盘空间(全部生产服务器)
- 备份新鲜度
- 镜像同步状态
- NAS 可达性
- 生产网站真实可用性(内容校验,非仅状态码)
- SSL 证书到期预警
- Docker 容器健康
- WordPress 安全检查
告警渠道:
- 共享上下文 /mnt/shared-context/alerts/
- Forgejo Issue持久化追踪
- 控制台输出
"""
import json
import os
import re
import ssl
import socket
import subprocess
import sys
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from pathlib import Path
FORGEJO_URL = os.environ.get("FORGEJO_URL", "https://feicode.com")
FORGEJO_TOKEN = os.environ.get("FORGEJO_TOKEN", "")
ALERT_DIR = Path("/mnt/shared-context/alerts")
ALERT_REPO = "WenPai-org/ci-workflows"
# 阈值
DISK_WARN_PCT = 80
BACKUP_MAX_AGE_HOURS = 30
MIRROR_STALE_DAYS = 3
SSL_WARN_DAYS = 14
class Check:
def __init__(self, name):
self.name = name
self.ok = True
self.message = ""
self.details = ""
def fail(self, message, details=""):
self.ok = False
self.message = message
self.details = details
return self
def success(self, message=""):
self.ok = True
self.message = message
return self
def api_get(path):
"""Forgejo API GET"""
url = f"{FORGEJO_URL}/api/v1/{path}"
req = urllib.request.Request(url, headers={"Authorization": f"token {FORGEJO_TOKEN}"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
except Exception:
return None
def run_cmd(cmd, timeout=10):
"""运行命令并返回输出"""
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return r.returncode, r.stdout.strip()
except subprocess.TimeoutExpired:
return -1, "timeout"
except Exception as e:
return -1, str(e)
# === 生产服务器配置 ===
PROD_SERVERS = {
"weixiaoduo-prod": {
"host": "weixiaoduo-prod",
"sites": [
{"url": "https://www.weixiaoduo.com", "expect": "weixiaoduo", "name": "薇晓朵主站"},
{"url": "https://www.feibisi.com", "expect": "feibisi", "name": "飞比斯"},
{"url": "https://tongji.feibisi.com", "expect": "matomo", "name": "Matomo统计"},
],
"containers": ["matomo-app", "matomo-mysql", "postgresql_an4p-postgresql_an4p-1"],
"disks": ["/", "/www"],
},
"feicode-prod": {
"host": "feicode-prod",
"sites": [
{"url": "https://feicode.com", "expect": "feicode", "name": "feiCode"},
],
"containers": ["forgejo_FeiCode-1", "wenpai-bridge", "forgejo-runner-01"],
"disks": ["/"],
},
"wptea-prod": {
"host": "wptea-prod",
"sites": [
{"url": "https://wptea.com", "expect": "wptea", "name": "WP茶馆"},
],
"containers": [],
"disks": ["/"],
},
"cravatar-prod": {
"host": "cravatar-prod",
"sites": [
{"url": "https://cravatar.com", "expect": "cravatar", "name": "Cravatar"},
{"url": "https://cn.cravatar.com", "expect": "cravatar", "name": "Cravatar CN"},
],
"containers": [],
"disks": ["/"],
},
}
# SSL 证书检查列表(关键域名)
SSL_DOMAINS = [
"www.weixiaoduo.com", "www.feibisi.com", "tongji.feibisi.com",
"feicode.com", "wptea.com",
"cravatar.com", "cn.cravatar.com",
]
# === 检查函数 ===
def check_forgejo_health():
"""检查 Forgejo 服务是否响应"""
c = Check("Forgejo 服务")
data = api_get("version")
if data and "version" in data:
return c.success(f"v{data['version']}")
return c.fail("Forgejo API 无响应")
def check_runner():
"""检查 Forgejo Runner 服务状态"""
c = Check("Forgejo Runner")
code, out = run_cmd("systemctl --user is-active forgejo-runner")
if code == 0 and out == "active":
return c.success("active")
return c.fail(f"Runner 状态: {out}")
def check_disk(server_name, server_cfg):
"""检查服务器磁盘空间"""
c = Check(f"磁盘 {server_name}")
host = server_cfg["host"]
problems = []
details = []
for mount in server_cfg["disks"]:
code, out = run_cmd(
f"ssh -o ConnectTimeout=5 {host} 'df -h {mount} | tail -1'", timeout=15
)
if code != 0:
problems.append(f"{mount}: 无法检查")
continue
parts = out.split()
if len(parts) >= 5:
usage_pct = int(parts[4].rstrip("%"))
if usage_pct >= DISK_WARN_PCT:
problems.append(f"{mount} {usage_pct}% ({parts[2]}/{parts[1]})")
else:
details.append(f"{mount} {usage_pct}%")
if problems:
return c.fail("; ".join(problems), "\n".join(details))
return c.success("; ".join(details))
def check_backup():
"""检查 Forgejo 备份新鲜度"""
c = Check("Forgejo 备份")
code, out = run_cmd(
"ssh -o ConnectTimeout=5 nas 'ls -lh /volume1/Download/backups/forgejo/latest/forgejo-dump-*.zip 2>/dev/null | tail -1'",
timeout=15
)
if code != 0:
return c.fail("无法检查 NAS 备份", out)
if not out.strip():
return c.fail("NAS 上无备份文件")
lines = out.strip().split("\n")
for line in lines:
if ".zip" in line:
parts = line.split()
# -rw------- 1 user group 181M Feb 19 10:02 forgejo-dump-20260219.zip
if len(parts) >= 9:
size = parts[4]
date_str = f"{parts[5]} {parts[6]} {parts[7]}"
fname = parts[8]
return c.success(f"{fname} ({size}, {date_str})")
return c.success("有备份文件")
def check_mirror_sync():
"""检查镜像同步状态"""
c = Check("镜像同步")
data = api_get("repos/search?mirror=true&limit=50&sort=updated&order=asc")
if not data:
return c.fail("无法获取镜像列表")
repos = data.get("data", data) if isinstance(data, dict) else data
cutoff = datetime.now() - timedelta(days=MIRROR_STALE_DAYS)
stale = []
for r in repos:
updated = r.get("mirror_updated", "")
if updated == "0001-01-01T00:00:00Z":
stale.append(f"{r['full_name']}(从未同步)")
elif updated:
try:
t = datetime.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S")
if t < cutoff:
stale.append(r["full_name"])
except ValueError:
pass
if stale:
return c.fail(f"{len(stale)} 个镜像超过 {MIRROR_STALE_DAYS} 天未同步",
"\n".join(stale[:10]))
return c.success(f"全部正常(检查了 {len(repos)} 个)")
def check_nas():
"""检查 NAS 可达性和磁盘"""
c = Check("NAS 存储")
code, out = run_cmd("ssh -o ConnectTimeout=5 nas 'df -h /volume1 | tail -1'", timeout=15)
if code != 0:
return c.fail("NAS 不可达", out)
parts = out.split()
if len(parts) >= 5:
return c.success(f"{parts[4]} ({parts[2]}/{parts[1]})")
return c.success("可达")
def check_website(site_cfg):
"""检查网站真实可用性(不只是状态码)"""
c = Check(f"网站 {site_cfg['name']}")
url = site_cfg["url"]
expect = site_cfg["expect"].lower()
try:
req = urllib.request.Request(url, headers={
"User-Agent": "InfraMonitor/1.0",
"Accept": "text/html",
})
with urllib.request.urlopen(req, timeout=15) as resp:
status = resp.status
body = resp.read(50000).decode("utf-8", errors="replace").lower()
# 检查 HTTP 状态码
if status >= 500:
return c.fail(f"HTTP {status}", url)
# 检查页面是否包含预期内容
if expect not in body:
return c.fail(f"页面无预期内容 '{site_cfg['expect']}'", f"HTTP {status}, {url}")
# 检查常见错误标志
error_patterns = [
("fatal error", "PHP Fatal Error"),
("database connection", "数据库连接失败"),
("error establishing", "数据库连接失败"),
("502 bad gateway", "502 网关错误"),
("503 service", "503 服务不可用"),
("maintenance mode", "维护模式"),
("parse error", "PHP Parse Error"),
("warning:</b>", "PHP Warning"),
]
for pattern, desc in error_patterns:
if pattern in body:
return c.fail(f"页面包含错误: {desc}", url)
return c.success(f"HTTP {status}")
except urllib.error.HTTPError as e:
return c.fail(f"HTTP {e.code}", url)
except urllib.error.URLError as e:
return c.fail(f"连接失败: {e.reason}", url)
except Exception as e:
return c.fail(f"异常: {str(e)[:80]}", url)
def check_ssl_cert(domain):
"""检查 SSL 证书到期时间"""
c = Check(f"SSL {domain}")
try:
ctx = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
expire_str = cert["notAfter"] # 'Nov 12 13:35:49 2026 GMT'
expire_date = datetime.strptime(expire_str, "%b %d %H:%M:%S %Y %Z")
days_left = (expire_date - datetime.now()).days
if days_left < 0:
return c.fail(f"已过期 {abs(days_left)}")
if days_left < SSL_WARN_DAYS:
return c.fail(f"{days_left} 天后过期 ({expire_date.strftime('%Y-%m-%d')})")
return c.success(f"{days_left} 天 ({expire_date.strftime('%Y-%m-%d')})")
except ssl.SSLCertVerificationError as e:
return c.fail(f"证书验证失败: {str(e)[:60]}")
except Exception as e:
return c.fail(f"检查失败: {str(e)[:60]}")
def check_containers(server_name, server_cfg):
"""检查 Docker 容器运行状态"""
c = Check(f"容器 {server_name}")
host = server_cfg["host"]
expected = server_cfg["containers"]
if not expected:
return c.success("无容器配置,跳过")
code, out = run_cmd(
f"ssh -o ConnectTimeout=5 {host} 'docker ps --format \"{{{{.Names}}}}:{{{{.Status}}}}\"'",
timeout=15,
)
if code != 0:
return c.fail("无法检查容器状态", out)
running = {}
for line in out.strip().split("\n"):
if ":" in line:
name, status = line.split(":", 1)
running[name] = status
down = [name for name in expected if name not in running]
if down:
return c.fail(f"容器未运行: {', '.join(down)}")
return c.success(f"{len(expected)} 个容器正常")
def check_wp_security():
"""检查 WordPress 安全指标webshell 快速扫描)"""
c = Check("WP 安全")
# 检查上传目录是否有可疑 PHP 文件(排除 index.php 和已知插件目录)
code, out = run_cmd(
"ssh -o ConnectTimeout=5 weixiaoduo-prod "
"'find /www/wwwroot/www.weixiaoduo.com/wp-content/uploads/ "
"-name \"*.php\" ! -name \"index.php\" "
"! -path \"*/wpallimport/*\" ! -path \"*/cravatar/*\" ! -path \"*/learndash/*\" "
"-type f 2>/dev/null | head -5'",
timeout=20,
)
if code != 0:
return c.fail("无法扫描", out)
if out.strip():
files = out.strip().split("\n")
return c.fail(
f"上传目录发现 {len(files)} 个可疑 PHP 文件",
"\n".join(files[:5]),
)
return c.success("上传目录无可疑 PHP 文件")
# === 告警输出 ===
def write_alert(checks):
"""写入告警到共享上下文"""
failures = [c for c in checks if not c.ok]
if not failures:
return
now = datetime.now().strftime("%Y-%m-%d %H:%M")
ALERT_DIR.mkdir(parents=True, exist_ok=True)
alert_file = ALERT_DIR / f"{datetime.now().strftime('%Y-%m-%d')}-infra.md"
content = f"# 基础设施告警 {now}\n\n"
for c in failures:
content += f"## {c.name}\n{c.message}\n"
if c.details:
content += f"```\n{c.details}\n```\n"
content += "\n"
alert_file.write_text(content)
print(f"告警已写入 {alert_file}")
def create_alert_issue(checks):
"""在 Forgejo 上创建告警 Issue"""
failures = [c for c in checks if not c.ok]
if not failures or not FORGEJO_TOKEN:
return
now = datetime.now().strftime("%Y-%m-%d %H:%M")
title = f"基础设施告警: {', '.join(c.name for c in failures)} ({now})"
body = "| 状态 | 检查项 | 详情 |\n|------|--------|------|\n"
for c in checks:
icon = "" if c.ok else ""
body += f"| {icon} | {c.name} | {c.message} |\n"
body += "\n"
for c in failures:
if c.details:
body += f"### {c.name}\n```\n{c.details}\n```\n\n"
payload = json.dumps({"title": title, "body": body}).encode()
url = f"{FORGEJO_URL}/api/v1/repos/{ALERT_REPO}/issues"
req = urllib.request.Request(
url, data=payload,
headers={"Authorization": f"token {FORGEJO_TOKEN}", "Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read())
print(f"告警 Issue #{data.get('number')} 已创建")
except Exception as e:
print(f"创建告警 Issue 失败: {e}")
def main():
print(f"=== 基础设施监控 {datetime.now().strftime('%Y-%m-%d %H:%M')} ===\n")
checks = []
# --- feiCode 平台 ---
print(" [feiCode 平台]")
checks.append(check_forgejo_health())
checks.append(check_runner())
checks.append(check_backup())
checks.append(check_mirror_sync())
checks.append(check_nas())
# --- 生产服务器磁盘 ---
print(" [生产服务器]")
for name, cfg in PROD_SERVERS.items():
checks.append(check_disk(name, cfg))
# --- Docker 容器 ---
for name, cfg in PROD_SERVERS.items():
checks.append(check_containers(name, cfg))
# --- 网站可用性 ---
print(" [网站可用性]")
for cfg in PROD_SERVERS.values():
for site in cfg["sites"]:
checks.append(check_website(site))
# --- SSL 证书 ---
print(" [SSL 证书]")
for domain in SSL_DOMAINS:
checks.append(check_ssl_cert(domain))
# --- WordPress 安全 ---
print(" [安全检查]")
checks.append(check_wp_security())
# --- 输出结果 ---
print()
for c in checks:
icon = "" if c.ok else ""
print(f" {icon} {c.name}: {c.message}")
if not c.ok and c.details:
for line in c.details.split("\n")[:5]:
print(f" {line}")
failures = [c for c in checks if not c.ok]
print(f"\n结果: {len(checks) - len(failures)}/{len(checks)} 通过")
if failures:
write_alert(checks)
create_alert_issue(checks)
sys.exit(1)
else:
# 清理 7 天前的旧告警
if ALERT_DIR.exists():
for f in ALERT_DIR.glob("*-infra.md"):
try:
age = datetime.now() - datetime.fromtimestamp(f.stat().st_mtime)
if age.days > 7:
f.unlink()
except Exception:
pass
print("全部正常。")
if __name__ == "__main__":
main()