#!/usr/bin/env python3 # /opt/pve-ha-web/app.py import os import re import shlex import socket import json import time import subprocess from typing import List, Dict, Any, Optional, Tuple from flask import Flask, request, jsonify, render_template APP_TITLE = "PVE HA Panel" DEFAULT_NODE = socket.gethostname() HA_UNITS_START = ["watchdog-mux", "pve-ha-crm", "pve-ha-lrm"] HA_UNITS_STOP = list(reversed(HA_UNITS_START)) app = Flask(__name__, template_folder="templates", static_folder="static") # ---------------- exec helpers ---------------- def run(cmd: List[str], timeout: int = 25) -> subprocess.CompletedProcess: return subprocess.run(cmd, check=False, text=True, capture_output=True, timeout=timeout) def get_text(cmd: List[str]) -> str: r = run(cmd) return r.stdout if r.returncode == 0 else "" def get_json(cmd: List[str]) -> Any: if cmd and cmd[0] == "pvesh" and "--output-format" not in cmd: cmd = cmd + ["--output-format", "json"] r = run(cmd) if r.returncode != 0 or not r.stdout.strip(): return None try: return json.loads(r.stdout) except Exception: return None def post_json(cmd: List[str]) -> Any: # force "create" for POST-like agent calls if cmd and cmd[0] == "pvesh" and len(cmd) > 2 and cmd[1] != "create": cmd = ["pvesh", "create"] + cmd[1:] r = run(cmd) if r.returncode != 0 or not r.stdout.strip(): return None try: return json.loads(r.stdout) except Exception: return None def is_active(unit: str) -> bool: return run(["systemctl", "is-active", "--quiet", unit]).returncode == 0 def start_if_needed(unit: str, out: List[str]) -> None: if not is_active(unit): out.append(f"+ start {unit}") run(["systemctl", "start", unit]) def stop_if_running(unit: str, out: List[str]) -> None: if is_active(unit): out.append(f"- stop {unit}") run(["systemctl", "stop", unit]) def ha_node_maint(enable: bool, node: str, out: List[str]) -> None: cmd = ["ha-manager", "crm-command", "node-maintenance", "enable" if enable else "disable", node] out.append("$ " + " ".join(shlex.quote(x) for x in cmd)) r = run(cmd) if r.returncode != 0: out.append(f"ERR: {r.stderr.strip()}") # ---------------- collectors ---------------- def get_pvecm_status() -> str: return get_text(["pvecm", "status"]) def get_quorumtool(short: bool = True) -> str: return get_text(["corosync-quorumtool", "-s" if short else "-l"]) def get_cfgtool() -> str: return get_text(["corosync-cfgtool", "-s"]) def get_ha_status_raw() -> str: return get_text(["ha-manager", "status"]) def get_pvesr_status() -> str: return get_text(["pvesr", "status"]) def votequorum_brief() -> Dict[str, Any]: out = get_quorumtool(True) rv: Dict[str, Any] = {} rx = { "expected": r"Expected votes:\s*(\d+)", "total": r"Total votes:\s*(\d+)", "quorum": r"Quorum:\s*(\d+)", "quorate": r"Quorate:\s*(Yes|No)" } for k, rgx in rx.items(): m = re.search(rgx, out, re.I) rv[k] = (None if not m else (m.group(1).lower() if k == "quorate" else int(m.group(1)))) out_l = get_quorumtool(False) lines = [ln for ln in out_l.splitlines() if re.match(r"^\s*\d+\s+\d+\s+\S+", ln)] rv["members"] = len(lines) if lines else None return rv def api_cluster_data() -> Dict[str, Any]: return { "cluster_status": get_json(["pvesh", "get", "/cluster/status"]) or [], "ha_status": get_json(["pvesh", "get", "/cluster/ha/status"]) or [], "ha_resources": get_json(["pvesh", "get", "/cluster/ha/resources"]) or [], "ha_groups": get_json(["pvesh", "get", "/cluster/ha/groups"]) or [], "nodes": get_json(["pvesh", "get", "/nodes"]) or [], } def enrich_nodes(nodes_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] for n in nodes_list or []: name = n.get("node") if not name: out.append(n) continue detail = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {} if "loadavg" in detail: n["loadavg"] = detail["loadavg"] if "cpu" in detail: n["cpu"] = detail["cpu"] if "memory" in detail: n["mem"] = detail["memory"].get("used"); n["maxmem"] = detail["memory"].get("total") if "rootfs" in detail: n["rootfs"] = detail["rootfs"].get("used"); n["maxrootfs"] = detail["rootfs"].get("total") out.append(n) return out # ---------------- ha-manager parser ---------------- def parse_ha_manager(text: str) -> Dict[str, Any]: nodes: Dict[str, Dict[str, str]] = {} resources: Dict[str, Dict[str, str]] = {} current_node: Optional[str] = None for line in (text or "").splitlines(): s = line.strip() m = re.match(r"node:\s+(\S+)\s+\(([^)]+)\)", s) if m: current_node = m.group(1) nodes.setdefault(current_node, {"node": current_node, "state": m.group(2)}) continue m = re.match(r"(lrm|crm)\s+status:\s+(\S+)", s) if m and current_node: nodes[current_node]["lrm" if m.group(1)=="lrm" else "crm"] = m.group(2) continue m = re.match(r"service:\s+(\S+)\s+on\s+(\S+)\s+\(([^)]+)\)", s) if m: sid, node, flags = m.group(1), m.group(2), m.group(3) rec = {"sid": sid, "node": node, "flags": flags} rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state")) resources[sid] = rec continue m = re.match(r"service:\s+(\S+)\s+\(([^)]*)\)\s+on\s+(\S+)", s) if m: sid, flags, node = m.group(1), m.group(2), m.group(3) rec = {"sid": sid, "node": node, "flags": flags} rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state")) resources[sid] = rec continue m = re.match(r"service\s+(\S+):\s+(\S+)\s+on\s+(\S+)", s) if m: sid, st, node = m.group(1), m.group(2), m.group(3) resources[sid] = {"sid": sid, "state": st, "node": node} continue return {"nodes": list(nodes.values()), "resources": list(resources.values())} # ---------------- SID utils / indexes ---------------- def norm_sid(s: Optional[str]) -> Optional[str]: if not s: return None s = str(s) m = re.match(r"^(vm|ct):(\d+)$", s) if m: return f"{m.group(1)}:{m.group(2)}" m = re.match(r"^(qemu|lxc)/(\d+)$", s) if m: return ("vm" if m.group(1) == "qemu" else "ct") + f":{m.group(2)}" return s def cluster_vmct_index() -> Dict[str, str]: items = get_json(["pvesh", "get", "/cluster/resources"]) or [] idx: Dict[str, str] = {} for it in items: t = it.get("type") if t not in ("qemu", "lxc"): continue vmid = it.get("vmid"); node = it.get("node") sid = (("vm" if t=="qemu" else "ct") + f":{vmid}") if vmid is not None else norm_sid(it.get("id")) if sid and node: idx[sid] = node return idx def cluster_vmct_meta() -> Dict[str, Dict[str, Any]]: items = get_json(["pvesh", "get", "/cluster/resources"]) or [] meta: Dict[str, Dict[str, Any]] = {} for it in items: t = it.get("type") if t not in ("qemu", "lxc"): continue sid = norm_sid(it.get("id")) or (("vm" if t == "qemu" else "ct") + f":{it.get('vmid')}") if sid: meta[sid] = { "sid": sid, "type": t, "vmid": it.get("vmid"), "node": it.get("node"), "name": it.get("name"), "status": it.get("status"), "hastate": it.get("hastate") } return meta def merge_resources(api_res: List[Dict[str, Any]], parsed_res: List[Dict[str, Any]], vmct_idx: Dict[str, str]) -> List[Dict[str, Any]]: by_sid: Dict[str, Dict[str, Any]] = {} # seed from /cluster/ha/resources for r in (api_res or []): sid = norm_sid(r.get("sid")) if not sid: continue x = dict(r) x["sid"] = sid by_sid[sid] = x # merge runtime from ha-manager for r in (parsed_res or []): sid = norm_sid(r.get("sid")) if not sid: continue x = by_sid.get(sid, {"sid": sid}) for k, v in r.items(): if k == "sid": continue if v not in (None, ""): x[k] = v by_sid[sid] = x # fill node from /cluster/resources for sid, x in by_sid.items(): if not x.get("node") and sid in vmct_idx: x["node"] = vmct_idx[sid] return list(by_sid.values()) # ---------------- VM details ---------------- def sid_to_tuple(sid: str, meta: Dict[str, Dict[str, Any]]) -> Optional[Tuple[str, int, str]]: sid_n = norm_sid(sid) if not sid_n: return None m = re.match(r"^(vm|ct):(\d+)$", sid_n) if not m: return None typ = "qemu" if m.group(1) == "vm" else "lxc" vmid = int(m.group(2)) node = (meta.get(sid_n) or {}).get("node") return (typ, vmid, node) def vm_detail_payload(sid: str) -> Dict[str, Any]: meta = cluster_vmct_meta() tup = sid_to_tuple(sid, meta) if not tup: return {"sid": sid, "error": "bad sid"} typ, vmid, node = tup if not node: return {"sid": sid, "error": "unknown node"} base = f"/nodes/{node}/{typ}/{vmid}" current = get_json(["pvesh", "get", f"{base}/status/current"]) or {} config = get_json(["pvesh", "get", f"{base}/config"]) or {} agent_info = None; agent_os = None; agent_ifaces = None if typ == "qemu": agent_info = get_json(["pvesh", "get", f"{base}/agent/info"]) agent_os = post_json(["pvesh", "create", f"{base}/agent/get-osinfo"]) or None agent_ifaces = post_json(["pvesh", "create", f"{base}/agent/network-get-interfaces"]) or None return { "sid": norm_sid(sid), "node": node, "type": typ, "vmid": vmid, "meta": meta.get(norm_sid(sid), {}), "current": current, "config": config, "agent": {"info": agent_info, "osinfo": agent_os, "ifaces": agent_ifaces} if typ=="qemu" else None } # ---------------- Node details ---------------- def node_detail_payload(name: str) -> Dict[str, Any]: if not name: return {"error": "no node"} status = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {} version = get_json(["pvesh", "get", f"/nodes/{name}/version"]) or {} timeinfo = get_json(["pvesh", "get", f"/nodes/{name}/time"]) or {} services = get_json(["pvesh", "get", f"/nodes/{name}/services"]) or [] network_cfg = get_json(["pvesh", "get", f"/nodes/{name}/network"]) or [] netstat = get_json(["pvesh", "get", f"/nodes/{name}/netstat"]) or [] # may be empty on some versions disks = get_json(["pvesh", "get", f"/nodes/{name}/disks/list"]) or [] subscription = get_json(["pvesh", "get", f"/nodes/{name}/subscription"]) or {} return { "node": name, "status": status, "version": version, "time": timeinfo, "services": services, "network_cfg": network_cfg, "netstat": netstat, "disks": disks, "subscription": subscription } def node_ha_services(node: str) -> Dict[str, str]: svcs = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or [] def one(name: str) -> str: for s in svcs: if s.get("name") == name: st = s.get("state") or s.get("active") or "" return "active" if str(st).lower() in ("running", "active") else (st or "inactive") return "" return {"crm_state": one("pve-ha-crm"), "lrm_state": one("pve-ha-lrm")} def units_for_node(node: str) -> Dict[str, str]: """ Preferuj /nodes//services. Normalizuj nazwy (bez .service) i mapuj różne pola na 'active'/'inactive'. """ wanted = {"watchdog-mux", "pve-ha-crm", "pve-ha-lrm"} svc = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or [] states: Dict[str, str] = {} def norm_state(s: dict) -> str: # Proxmox bywa: {"state":"enabled","active":"active"} albo {"active":1} albo {"status":"running"} itp. raw_active = str(s.get("active", "")).lower() status = str(s.get("status", "")).lower() substate = str(s.get("substate", "")).lower() state = str(s.get("state", "")).lower() any_active = ( raw_active in ("active", "running", "1", "true") or status in ("active", "running") or substate in ("running") or ("running" in state) ) return "active" if any_active else "inactive" for s in svc: name_raw = (s.get("name") or "") name = re.sub(r"\.service$", "", name_raw) if name in wanted: states[name] = norm_state(s) # fallback lokalny tylko jeśli API nic nie zwróciło if not states: for u in wanted: states[u] = "active" if is_active(u) else "inactive" # zawsze zwróć pełny zestaw for u in wanted: states.setdefault(u, "inactive") return states # ---------------- snapshot ---------------- def status_snapshot(node: str) -> Dict[str, Any]: vq = votequorum_brief() api = api_cluster_data() api["nodes"] = enrich_nodes(api.get("nodes", [])) ha_raw = get_ha_status_raw().strip() parsed = parse_ha_manager(ha_raw) vmct_ix = cluster_vmct_index() ha_status = api.get("ha_status") or [] if not ha_status and parsed.get("nodes"): ha_status = [{ "node": n.get("node"), "state": n.get("state"), "crm_state": n.get("crm", ""), "lrm_state": n.get("lrm", "") } for n in parsed["nodes"]] if not ha_status: for it in api.get("cluster_status", []): if it.get("type") == "node": ha_status.append({ "node": it.get("name"), "state": "online" if it.get("online") else "offline", "crm_state": "", "lrm_state": "" }) enriched = [] for n in ha_status: node_name = n.get("node"); crm = n.get("crm_state") or ""; lrm = n.get("lrm_state") or "" if node_name and (not crm or not lrm): try: svc = node_ha_services(node_name) if not crm: n["crm_state"] = svc.get("crm_state", "") if not lrm: n["lrm_state"] = svc.get("lrm_state", "") except Exception: pass enriched.append(n) api["ha_status"] = enriched api["ha_resources"] = merge_resources(api.get("ha_resources", []), parsed.get("resources", []), vmct_ix) units = units_for_node(node or socket.gethostname()) return { "node_arg": node, "hostname": socket.gethostname(), "votequorum": vq, "units": units, "cfgtool": get_cfgtool().strip(), "pvecm": get_pvecm_status().strip(), "ha_raw": ha_raw, "replication": get_pvesr_status().strip(), "api": api, "ts": int(time.time()) } # ---------------- web ---------------- @app.get("/") def index(): node = request.args.get("node", DEFAULT_NODE) return render_template("index.html", title=APP_TITLE, node=node) @app.get("/api/info") def api_info(): node = request.args.get("node", DEFAULT_NODE) return jsonify(status_snapshot(node)) @app.get("/api/vm") def api_vm_detail(): sid = request.args.get("sid", "") return jsonify(vm_detail_payload(sid)) @app.get("/api/node") def api_node_detail(): name = request.args.get("name", "") return jsonify(node_detail_payload(name)) @app.get("/api/list-vmct") def api_list_vmct(): meta = cluster_vmct_meta() ha_sids = {norm_sid(r.get("sid")) for r in (api_cluster_data().get("ha_resources") or []) if r.get("sid")} nonha = [v for k, v in meta.items() if k not in ha_sids] return jsonify({ "nonha": nonha, "ha_index": list(ha_sids), "count_nonha": len(nonha), "count_all_vmct": len(meta) }) @app.post("/api/enable") def api_enable(): if os.geteuid() != 0: return jsonify(ok=False, error="run as root"), 403 data = request.get_json(force=True, silent=True) or {} node = data.get("node") or DEFAULT_NODE log: List[str] = [] ha_node_maint(True, node, log) for u in HA_UNITS_STOP: stop_if_running(u, log) return jsonify(ok=True, log=log) @app.post("/api/disable") def api_disable(): if os.geteuid() != 0: return jsonify(ok=False, error="run as root"), 403 data = request.get_json(force=True, silent=True) or {} node = data.get("node") or DEFAULT_NODE log: List[str] = [] for u in HA_UNITS_START: start_if_needed(u, log) ha_node_maint(False, node, log) return jsonify(ok=True, log=log) if __name__ == "__main__": import argparse p = argparse.ArgumentParser() p.add_argument("--bind", default="127.0.0.1:8088", help="addr:port") p.add_argument("--node", default=DEFAULT_NODE, help="default node") args = p.parse_args() DEFAULT_NODE = args.node host, port = args.bind.split(":") app.run(host=host, port=int(port), debug=False, threaded=True)