push
This commit is contained in:
442
app.py
Normal file
442
app.py
Normal file
@@ -0,0 +1,442 @@
|
||||
#!/usr/bin/env python3
|
||||
# /opt/pve-ha-web/app.py
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import socket
|
||||
import json
|
||||
import time
|
||||
import subprocess
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from flask import Flask, request, jsonify, render_template
|
||||
|
||||
APP_TITLE = "PVE HA Panel"
|
||||
DEFAULT_NODE = socket.gethostname()
|
||||
HA_UNITS_START = ["watchdog-mux", "pve-ha-crm", "pve-ha-lrm"]
|
||||
HA_UNITS_STOP = list(reversed(HA_UNITS_START))
|
||||
|
||||
app = Flask(__name__, template_folder="templates", static_folder="static")
|
||||
|
||||
# ---------------- exec helpers ----------------
|
||||
def run(cmd: List[str], timeout: int = 25) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(cmd, check=False, text=True, capture_output=True, timeout=timeout)
|
||||
|
||||
def get_text(cmd: List[str]) -> str:
|
||||
r = run(cmd)
|
||||
return r.stdout if r.returncode == 0 else ""
|
||||
|
||||
def get_json(cmd: List[str]) -> Any:
|
||||
if cmd and cmd[0] == "pvesh" and "--output-format" not in cmd:
|
||||
cmd = cmd + ["--output-format", "json"]
|
||||
r = run(cmd)
|
||||
if r.returncode != 0 or not r.stdout.strip():
|
||||
return None
|
||||
try:
|
||||
return json.loads(r.stdout)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def post_json(cmd: List[str]) -> Any:
|
||||
# force "create" for POST-like agent calls
|
||||
if cmd and cmd[0] == "pvesh" and len(cmd) > 2 and cmd[1] != "create":
|
||||
cmd = ["pvesh", "create"] + cmd[1:]
|
||||
r = run(cmd)
|
||||
if r.returncode != 0 or not r.stdout.strip():
|
||||
return None
|
||||
try:
|
||||
return json.loads(r.stdout)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def is_active(unit: str) -> bool:
|
||||
return run(["systemctl", "is-active", "--quiet", unit]).returncode == 0
|
||||
|
||||
def start_if_needed(unit: str, out: List[str]) -> None:
|
||||
if not is_active(unit):
|
||||
out.append(f"+ start {unit}")
|
||||
run(["systemctl", "start", unit])
|
||||
|
||||
def stop_if_running(unit: str, out: List[str]) -> None:
|
||||
if is_active(unit):
|
||||
out.append(f"- stop {unit}")
|
||||
run(["systemctl", "stop", unit])
|
||||
|
||||
def ha_node_maint(enable: bool, node: str, out: List[str]) -> None:
|
||||
cmd = ["ha-manager", "crm-command", "node-maintenance", "enable" if enable else "disable", node]
|
||||
out.append("$ " + " ".join(shlex.quote(x) for x in cmd))
|
||||
r = run(cmd)
|
||||
if r.returncode != 0:
|
||||
out.append(f"ERR: {r.stderr.strip()}")
|
||||
|
||||
# ---------------- collectors ----------------
|
||||
def get_pvecm_status() -> str: return get_text(["pvecm", "status"])
|
||||
def get_quorumtool(short: bool = True) -> str: return get_text(["corosync-quorumtool", "-s" if short else "-l"])
|
||||
def get_cfgtool() -> str: return get_text(["corosync-cfgtool", "-s"])
|
||||
def get_ha_status_raw() -> str: return get_text(["ha-manager", "status"])
|
||||
def get_pvesr_status() -> str: return get_text(["pvesr", "status"])
|
||||
|
||||
def votequorum_brief() -> Dict[str, Any]:
|
||||
out = get_quorumtool(True)
|
||||
rv: Dict[str, Any] = {}
|
||||
rx = {
|
||||
"expected": r"Expected votes:\s*(\d+)",
|
||||
"total": r"Total votes:\s*(\d+)",
|
||||
"quorum": r"Quorum:\s*(\d+)",
|
||||
"quorate": r"Quorate:\s*(Yes|No)"
|
||||
}
|
||||
for k, rgx in rx.items():
|
||||
m = re.search(rgx, out, re.I)
|
||||
rv[k] = (None if not m else (m.group(1).lower() if k == "quorate" else int(m.group(1))))
|
||||
out_l = get_quorumtool(False)
|
||||
lines = [ln for ln in out_l.splitlines() if re.match(r"^\s*\d+\s+\d+\s+\S+", ln)]
|
||||
rv["members"] = len(lines) if lines else None
|
||||
return rv
|
||||
|
||||
def api_cluster_data() -> Dict[str, Any]:
|
||||
return {
|
||||
"cluster_status": get_json(["pvesh", "get", "/cluster/status"]) or [],
|
||||
"ha_status": get_json(["pvesh", "get", "/cluster/ha/status"]) or [],
|
||||
"ha_resources": get_json(["pvesh", "get", "/cluster/ha/resources"]) or [],
|
||||
"ha_groups": get_json(["pvesh", "get", "/cluster/ha/groups"]) or [],
|
||||
"nodes": get_json(["pvesh", "get", "/nodes"]) or [],
|
||||
}
|
||||
|
||||
def enrich_nodes(nodes_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
out: List[Dict[str, Any]] = []
|
||||
for n in nodes_list or []:
|
||||
name = n.get("node")
|
||||
if not name:
|
||||
out.append(n)
|
||||
continue
|
||||
detail = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
|
||||
if "loadavg" in detail: n["loadavg"] = detail["loadavg"]
|
||||
if "cpu" in detail: n["cpu"] = detail["cpu"]
|
||||
if "memory" in detail:
|
||||
n["mem"] = detail["memory"].get("used"); n["maxmem"] = detail["memory"].get("total")
|
||||
if "rootfs" in detail:
|
||||
n["rootfs"] = detail["rootfs"].get("used"); n["maxrootfs"] = detail["rootfs"].get("total")
|
||||
out.append(n)
|
||||
return out
|
||||
|
||||
# ---------------- ha-manager parser ----------------
|
||||
def parse_ha_manager(text: str) -> Dict[str, Any]:
|
||||
nodes: Dict[str, Dict[str, str]] = {}
|
||||
resources: Dict[str, Dict[str, str]] = {}
|
||||
current_node: Optional[str] = None
|
||||
for line in (text or "").splitlines():
|
||||
s = line.strip()
|
||||
m = re.match(r"node:\s+(\S+)\s+\(([^)]+)\)", s)
|
||||
if m:
|
||||
current_node = m.group(1)
|
||||
nodes.setdefault(current_node, {"node": current_node, "state": m.group(2)})
|
||||
continue
|
||||
m = re.match(r"(lrm|crm)\s+status:\s+(\S+)", s)
|
||||
if m and current_node:
|
||||
nodes[current_node]["lrm" if m.group(1)=="lrm" else "crm"] = m.group(2)
|
||||
continue
|
||||
m = re.match(r"service:\s+(\S+)\s+on\s+(\S+)\s+\(([^)]+)\)", s)
|
||||
if m:
|
||||
sid, node, flags = m.group(1), m.group(2), m.group(3)
|
||||
rec = {"sid": sid, "node": node, "flags": flags}
|
||||
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
|
||||
resources[sid] = rec
|
||||
continue
|
||||
m = re.match(r"service:\s+(\S+)\s+\(([^)]*)\)\s+on\s+(\S+)", s)
|
||||
if m:
|
||||
sid, flags, node = m.group(1), m.group(2), m.group(3)
|
||||
rec = {"sid": sid, "node": node, "flags": flags}
|
||||
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
|
||||
resources[sid] = rec
|
||||
continue
|
||||
m = re.match(r"service\s+(\S+):\s+(\S+)\s+on\s+(\S+)", s)
|
||||
if m:
|
||||
sid, st, node = m.group(1), m.group(2), m.group(3)
|
||||
resources[sid] = {"sid": sid, "state": st, "node": node}
|
||||
continue
|
||||
return {"nodes": list(nodes.values()), "resources": list(resources.values())}
|
||||
|
||||
# ---------------- SID utils / indexes ----------------
|
||||
def norm_sid(s: Optional[str]) -> Optional[str]:
|
||||
if not s: return None
|
||||
s = str(s)
|
||||
m = re.match(r"^(vm|ct):(\d+)$", s)
|
||||
if m: return f"{m.group(1)}:{m.group(2)}"
|
||||
m = re.match(r"^(qemu|lxc)/(\d+)$", s)
|
||||
if m: return ("vm" if m.group(1) == "qemu" else "ct") + f":{m.group(2)}"
|
||||
return s
|
||||
|
||||
def cluster_vmct_index() -> Dict[str, str]:
|
||||
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
|
||||
idx: Dict[str, str] = {}
|
||||
for it in items:
|
||||
t = it.get("type")
|
||||
if t not in ("qemu", "lxc"): continue
|
||||
vmid = it.get("vmid"); node = it.get("node")
|
||||
sid = (("vm" if t=="qemu" else "ct") + f":{vmid}") if vmid is not None else norm_sid(it.get("id"))
|
||||
if sid and node: idx[sid] = node
|
||||
return idx
|
||||
|
||||
def cluster_vmct_meta() -> Dict[str, Dict[str, Any]]:
|
||||
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
|
||||
meta: Dict[str, Dict[str, Any]] = {}
|
||||
for it in items:
|
||||
t = it.get("type")
|
||||
if t not in ("qemu", "lxc"): continue
|
||||
sid = norm_sid(it.get("id")) or (("vm" if t == "qemu" else "ct") + f":{it.get('vmid')}")
|
||||
if sid:
|
||||
meta[sid] = {
|
||||
"sid": sid, "type": t, "vmid": it.get("vmid"),
|
||||
"node": it.get("node"), "name": it.get("name"),
|
||||
"status": it.get("status"), "hastate": it.get("hastate")
|
||||
}
|
||||
return meta
|
||||
|
||||
def merge_resources(api_res: List[Dict[str, Any]],
|
||||
parsed_res: List[Dict[str, Any]],
|
||||
vmct_idx: Dict[str, str]) -> List[Dict[str, Any]]:
|
||||
by_sid: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
# seed from /cluster/ha/resources
|
||||
for r in (api_res or []):
|
||||
sid = norm_sid(r.get("sid"))
|
||||
if not sid:
|
||||
continue
|
||||
x = dict(r)
|
||||
x["sid"] = sid
|
||||
by_sid[sid] = x
|
||||
|
||||
# merge runtime from ha-manager
|
||||
for r in (parsed_res or []):
|
||||
sid = norm_sid(r.get("sid"))
|
||||
if not sid:
|
||||
continue
|
||||
x = by_sid.get(sid, {"sid": sid})
|
||||
for k, v in r.items():
|
||||
if k == "sid":
|
||||
continue
|
||||
if v not in (None, ""):
|
||||
x[k] = v
|
||||
by_sid[sid] = x
|
||||
|
||||
# fill node from /cluster/resources
|
||||
for sid, x in by_sid.items():
|
||||
if not x.get("node") and sid in vmct_idx:
|
||||
x["node"] = vmct_idx[sid]
|
||||
|
||||
return list(by_sid.values())
|
||||
|
||||
# ---------------- VM details ----------------
|
||||
def sid_to_tuple(sid: str, meta: Dict[str, Dict[str, Any]]) -> Optional[Tuple[str, int, str]]:
|
||||
sid_n = norm_sid(sid)
|
||||
if not sid_n: return None
|
||||
m = re.match(r"^(vm|ct):(\d+)$", sid_n)
|
||||
if not m: return None
|
||||
typ = "qemu" if m.group(1) == "vm" else "lxc"
|
||||
vmid = int(m.group(2))
|
||||
node = (meta.get(sid_n) or {}).get("node")
|
||||
return (typ, vmid, node)
|
||||
|
||||
def vm_detail_payload(sid: str) -> Dict[str, Any]:
|
||||
meta = cluster_vmct_meta()
|
||||
tup = sid_to_tuple(sid, meta)
|
||||
if not tup: return {"sid": sid, "error": "bad sid"}
|
||||
typ, vmid, node = tup
|
||||
if not node: return {"sid": sid, "error": "unknown node"}
|
||||
base = f"/nodes/{node}/{typ}/{vmid}"
|
||||
current = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
|
||||
config = get_json(["pvesh", "get", f"{base}/config"]) or {}
|
||||
agent_info = None; agent_os = None; agent_ifaces = None
|
||||
if typ == "qemu":
|
||||
agent_info = get_json(["pvesh", "get", f"{base}/agent/info"])
|
||||
agent_os = post_json(["pvesh", "create", f"{base}/agent/get-osinfo"]) or None
|
||||
agent_ifaces = post_json(["pvesh", "create", f"{base}/agent/network-get-interfaces"]) or None
|
||||
return {
|
||||
"sid": norm_sid(sid), "node": node, "type": typ, "vmid": vmid,
|
||||
"meta": meta.get(norm_sid(sid), {}),
|
||||
"current": current, "config": config,
|
||||
"agent": {"info": agent_info, "osinfo": agent_os, "ifaces": agent_ifaces} if typ=="qemu" else None
|
||||
}
|
||||
|
||||
# ---------------- Node details ----------------
|
||||
def node_detail_payload(name: str) -> Dict[str, Any]:
|
||||
if not name: return {"error": "no node"}
|
||||
status = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
|
||||
version = get_json(["pvesh", "get", f"/nodes/{name}/version"]) or {}
|
||||
timeinfo = get_json(["pvesh", "get", f"/nodes/{name}/time"]) or {}
|
||||
services = get_json(["pvesh", "get", f"/nodes/{name}/services"]) or []
|
||||
network_cfg = get_json(["pvesh", "get", f"/nodes/{name}/network"]) or []
|
||||
netstat = get_json(["pvesh", "get", f"/nodes/{name}/netstat"]) or [] # may be empty on some versions
|
||||
disks = get_json(["pvesh", "get", f"/nodes/{name}/disks/list"]) or []
|
||||
subscription = get_json(["pvesh", "get", f"/nodes/{name}/subscription"]) or {}
|
||||
return {
|
||||
"node": name,
|
||||
"status": status,
|
||||
"version": version,
|
||||
"time": timeinfo,
|
||||
"services": services,
|
||||
"network_cfg": network_cfg,
|
||||
"netstat": netstat,
|
||||
"disks": disks,
|
||||
"subscription": subscription
|
||||
}
|
||||
|
||||
def node_ha_services(node: str) -> Dict[str, str]:
|
||||
svcs = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
|
||||
def one(name: str) -> str:
|
||||
for s in svcs:
|
||||
if s.get("name") == name:
|
||||
st = s.get("state") or s.get("active") or ""
|
||||
return "active" if str(st).lower() in ("running", "active") else (st or "inactive")
|
||||
return ""
|
||||
return {"crm_state": one("pve-ha-crm"), "lrm_state": one("pve-ha-lrm")}
|
||||
|
||||
def units_for_node(node: str) -> Dict[str, str]:
|
||||
"""
|
||||
Preferuj /nodes/<node>/services. Normalizuj nazwy (bez .service)
|
||||
i mapuj różne pola na 'active'/'inactive'.
|
||||
"""
|
||||
wanted = {"watchdog-mux", "pve-ha-crm", "pve-ha-lrm"}
|
||||
svc = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
|
||||
states: Dict[str, str] = {}
|
||||
|
||||
def norm_state(s: dict) -> str:
|
||||
# Proxmox bywa: {"state":"enabled","active":"active"} albo {"active":1} albo {"status":"running"} itp.
|
||||
raw_active = str(s.get("active", "")).lower()
|
||||
status = str(s.get("status", "")).lower()
|
||||
substate = str(s.get("substate", "")).lower()
|
||||
state = str(s.get("state", "")).lower()
|
||||
any_active = (
|
||||
raw_active in ("active", "running", "1", "true") or
|
||||
status in ("active", "running") or
|
||||
substate in ("running") or
|
||||
("running" in state)
|
||||
)
|
||||
return "active" if any_active else "inactive"
|
||||
|
||||
for s in svc:
|
||||
name_raw = (s.get("name") or "")
|
||||
name = re.sub(r"\.service$", "", name_raw)
|
||||
if name in wanted:
|
||||
states[name] = norm_state(s)
|
||||
|
||||
# fallback lokalny tylko jeśli API nic nie zwróciło
|
||||
if not states:
|
||||
for u in wanted:
|
||||
states[u] = "active" if is_active(u) else "inactive"
|
||||
|
||||
# zawsze zwróć pełny zestaw
|
||||
for u in wanted:
|
||||
states.setdefault(u, "inactive")
|
||||
|
||||
return states
|
||||
|
||||
# ---------------- snapshot ----------------
|
||||
def status_snapshot(node: str) -> Dict[str, Any]:
|
||||
vq = votequorum_brief()
|
||||
api = api_cluster_data()
|
||||
api["nodes"] = enrich_nodes(api.get("nodes", []))
|
||||
ha_raw = get_ha_status_raw().strip()
|
||||
parsed = parse_ha_manager(ha_raw)
|
||||
vmct_ix = cluster_vmct_index()
|
||||
|
||||
ha_status = api.get("ha_status") or []
|
||||
if not ha_status and parsed.get("nodes"):
|
||||
ha_status = [{
|
||||
"node": n.get("node"), "state": n.get("state"),
|
||||
"crm_state": n.get("crm", ""), "lrm_state": n.get("lrm", "")
|
||||
} for n in parsed["nodes"]]
|
||||
if not ha_status:
|
||||
for it in api.get("cluster_status", []):
|
||||
if it.get("type") == "node":
|
||||
ha_status.append({
|
||||
"node": it.get("name"),
|
||||
"state": "online" if it.get("online") else "offline",
|
||||
"crm_state": "", "lrm_state": ""
|
||||
})
|
||||
|
||||
enriched = []
|
||||
for n in ha_status:
|
||||
node_name = n.get("node"); crm = n.get("crm_state") or ""; lrm = n.get("lrm_state") or ""
|
||||
if node_name and (not crm or not lrm):
|
||||
try:
|
||||
svc = node_ha_services(node_name)
|
||||
if not crm: n["crm_state"] = svc.get("crm_state", "")
|
||||
if not lrm: n["lrm_state"] = svc.get("lrm_state", "")
|
||||
except Exception:
|
||||
pass
|
||||
enriched.append(n)
|
||||
api["ha_status"] = enriched
|
||||
|
||||
api["ha_resources"] = merge_resources(api.get("ha_resources", []), parsed.get("resources", []), vmct_ix)
|
||||
|
||||
units = units_for_node(node or socket.gethostname())
|
||||
return {
|
||||
"node_arg": node, "hostname": socket.gethostname(),
|
||||
"votequorum": vq, "units": units,
|
||||
"cfgtool": get_cfgtool().strip(), "pvecm": get_pvecm_status().strip(),
|
||||
"ha_raw": ha_raw, "replication": get_pvesr_status().strip(),
|
||||
"api": api, "ts": int(time.time())
|
||||
}
|
||||
|
||||
# ---------------- web ----------------
|
||||
@app.get("/")
|
||||
def index():
|
||||
node = request.args.get("node", DEFAULT_NODE)
|
||||
return render_template("index.html", title=APP_TITLE, node=node)
|
||||
|
||||
@app.get("/api/info")
|
||||
def api_info():
|
||||
node = request.args.get("node", DEFAULT_NODE)
|
||||
return jsonify(status_snapshot(node))
|
||||
|
||||
@app.get("/api/vm")
|
||||
def api_vm_detail():
|
||||
sid = request.args.get("sid", "")
|
||||
return jsonify(vm_detail_payload(sid))
|
||||
|
||||
@app.get("/api/node")
|
||||
def api_node_detail():
|
||||
name = request.args.get("name", "")
|
||||
return jsonify(node_detail_payload(name))
|
||||
|
||||
@app.get("/api/list-vmct")
|
||||
def api_list_vmct():
|
||||
meta = cluster_vmct_meta()
|
||||
ha_sids = {norm_sid(r.get("sid")) for r in (api_cluster_data().get("ha_resources") or []) if r.get("sid")}
|
||||
nonha = [v for k, v in meta.items() if k not in ha_sids]
|
||||
return jsonify({
|
||||
"nonha": nonha, "ha_index": list(ha_sids),
|
||||
"count_nonha": len(nonha), "count_all_vmct": len(meta)
|
||||
})
|
||||
|
||||
@app.post("/api/enable")
|
||||
def api_enable():
|
||||
if os.geteuid() != 0:
|
||||
return jsonify(ok=False, error="run as root"), 403
|
||||
data = request.get_json(force=True, silent=True) or {}
|
||||
node = data.get("node") or DEFAULT_NODE
|
||||
log: List[str] = []
|
||||
ha_node_maint(True, node, log)
|
||||
for u in HA_UNITS_STOP: stop_if_running(u, log)
|
||||
return jsonify(ok=True, log=log)
|
||||
|
||||
@app.post("/api/disable")
|
||||
def api_disable():
|
||||
if os.geteuid() != 0:
|
||||
return jsonify(ok=False, error="run as root"), 403
|
||||
data = request.get_json(force=True, silent=True) or {}
|
||||
node = data.get("node") or DEFAULT_NODE
|
||||
log: List[str] = []
|
||||
for u in HA_UNITS_START: start_if_needed(u, log)
|
||||
ha_node_maint(False, node, log)
|
||||
return jsonify(ok=True, log=log)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--bind", default="127.0.0.1:8088", help="addr:port")
|
||||
p.add_argument("--node", default=DEFAULT_NODE, help="default node")
|
||||
args = p.parse_args()
|
||||
DEFAULT_NODE = args.node
|
||||
host, port = args.bind.split(":")
|
||||
app.run(host=host, port=int(port), debug=False, threaded=True)
|
Reference in New Issue
Block a user