Files
pve-ha-web/app.py
Mateusz Gruszczyński 8c545aca55 vm management
2025-10-17 15:55:21 +02:00

508 lines
19 KiB
Python

#!/usr/bin/env python3
# /opt/pve-ha-web/app.py
import os
import re
import shlex
import socket
import json
import time
import subprocess
from typing import List, Dict, Any, Optional, Tuple
from flask import Flask, request, jsonify, render_template
APP_TITLE = "PVE HA Panel"
DEFAULT_NODE = socket.gethostname()
HA_UNITS_START = ["watchdog-mux", "pve-ha-crm", "pve-ha-lrm"]
HA_UNITS_STOP = list(reversed(HA_UNITS_START))
app = Flask(__name__, template_folder="templates", static_folder="static")
# ---------------- exec helpers ----------------
def run(cmd: List[str], timeout: int = 25) -> subprocess.CompletedProcess:
return subprocess.run(cmd, check=False, text=True, capture_output=True, timeout=timeout)
def get_text(cmd: List[str]) -> str:
r = run(cmd)
return r.stdout if r.returncode == 0 else ""
def get_json(cmd: List[str]) -> Any:
if cmd and cmd[0] == "pvesh" and "--output-format" not in cmd:
cmd = cmd + ["--output-format", "json"]
r = run(cmd)
if r.returncode != 0 or not r.stdout.strip():
return None
try:
return json.loads(r.stdout)
except Exception:
return None
def post_json(cmd: List[str]) -> Any:
# force "create" for POST-like agent calls
if cmd and cmd[0] == "pvesh" and len(cmd) > 2 and cmd[1] != "create":
cmd = ["pvesh", "create"] + cmd[1:]
r = run(cmd)
if r.returncode != 0 or not r.stdout.strip():
return None
try:
return json.loads(r.stdout)
except Exception:
return None
def is_active(unit: str) -> bool:
return run(["systemctl", "is-active", "--quiet", unit]).returncode == 0
def start_if_needed(unit: str, out: List[str]) -> None:
if not is_active(unit):
out.append(f"+ start {unit}")
run(["systemctl", "start", unit])
def stop_if_running(unit: str, out: List[str]) -> None:
if is_active(unit):
out.append(f"- stop {unit}")
run(["systemctl", "stop", unit])
def ha_node_maint(enable: bool, node: str, out: List[str]) -> None:
cmd = ["ha-manager", "crm-command", "node-maintenance", "enable" if enable else "disable", node]
out.append("$ " + " ".join(shlex.quote(x) for x in cmd))
r = run(cmd)
if r.returncode != 0:
out.append(f"ERR: {r.stderr.strip()}")
# ---------------- collectors ----------------
def get_pvecm_status() -> str: return get_text(["pvecm", "status"])
def get_quorumtool(short: bool = True) -> str: return get_text(["corosync-quorumtool", "-s" if short else "-l"])
def get_cfgtool() -> str: return get_text(["corosync-cfgtool", "-s"])
def get_ha_status_raw() -> str: return get_text(["ha-manager", "status"])
def get_pvesr_status() -> str: return get_text(["pvesr", "status"])
def votequorum_brief() -> Dict[str, Any]:
out = get_quorumtool(True)
rv: Dict[str, Any] = {}
rx = {
"expected": r"Expected votes:\s*(\d+)",
"total": r"Total votes:\s*(\d+)",
"quorum": r"Quorum:\s*(\d+)",
"quorate": r"Quorate:\s*(Yes|No)"
}
for k, rgx in rx.items():
m = re.search(rgx, out, re.I)
rv[k] = (None if not m else (m.group(1).lower() if k == "quorate" else int(m.group(1))))
out_l = get_quorumtool(False)
lines = [ln for ln in out_l.splitlines() if re.match(r"^\s*\d+\s+\d+\s+\S+", ln)]
rv["members"] = len(lines) if lines else None
return rv
def api_cluster_data() -> Dict[str, Any]:
return {
"cluster_status": get_json(["pvesh", "get", "/cluster/status"]) or [],
"ha_status": get_json(["pvesh", "get", "/cluster/ha/status"]) or [],
"ha_resources": get_json(["pvesh", "get", "/cluster/ha/resources"]) or [],
"ha_groups": get_json(["pvesh", "get", "/cluster/ha/groups"]) or [],
"nodes": get_json(["pvesh", "get", "/nodes"]) or [],
}
def enrich_nodes(nodes_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for n in nodes_list or []:
name = n.get("node")
if not name:
out.append(n)
continue
detail = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
if "loadavg" in detail: n["loadavg"] = detail["loadavg"]
if "cpu" in detail: n["cpu"] = detail["cpu"]
if "memory" in detail:
n["mem"] = detail["memory"].get("used"); n["maxmem"] = detail["memory"].get("total")
if "rootfs" in detail:
n["rootfs"] = detail["rootfs"].get("used"); n["maxrootfs"] = detail["rootfs"].get("total")
out.append(n)
return out
# ---------------- ha-manager parser ----------------
def parse_ha_manager(text: str) -> Dict[str, Any]:
nodes: Dict[str, Dict[str, str]] = {}
resources: Dict[str, Dict[str, str]] = {}
current_node: Optional[str] = None
for line in (text or "").splitlines():
s = line.strip()
m = re.match(r"node:\s+(\S+)\s+\(([^)]+)\)", s)
if m:
current_node = m.group(1)
nodes.setdefault(current_node, {"node": current_node, "state": m.group(2)})
continue
m = re.match(r"(lrm|crm)\s+status:\s+(\S+)", s)
if m and current_node:
nodes[current_node]["lrm" if m.group(1)=="lrm" else "crm"] = m.group(2)
continue
m = re.match(r"service:\s+(\S+)\s+on\s+(\S+)\s+\(([^)]+)\)", s)
if m:
sid, node, flags = m.group(1), m.group(2), m.group(3)
rec = {"sid": sid, "node": node, "flags": flags}
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
resources[sid] = rec
continue
m = re.match(r"service:\s+(\S+)\s+\(([^)]*)\)\s+on\s+(\S+)", s)
if m:
sid, flags, node = m.group(1), m.group(2), m.group(3)
rec = {"sid": sid, "node": node, "flags": flags}
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
resources[sid] = rec
continue
m = re.match(r"service\s+(\S+):\s+(\S+)\s+on\s+(\S+)", s)
if m:
sid, st, node = m.group(1), m.group(2), m.group(3)
resources[sid] = {"sid": sid, "state": st, "node": node}
continue
return {"nodes": list(nodes.values()), "resources": list(resources.values())}
# ---------------- SID utils / indexes ----------------
def norm_sid(s: Optional[str]) -> Optional[str]:
if not s: return None
s = str(s)
m = re.match(r"^(vm|ct):(\d+)$", s)
if m: return f"{m.group(1)}:{m.group(2)}"
m = re.match(r"^(qemu|lxc)/(\d+)$", s)
if m: return ("vm" if m.group(1) == "qemu" else "ct") + f":{m.group(2)}"
return s
def cluster_vmct_index() -> Dict[str, str]:
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
idx: Dict[str, str] = {}
for it in items:
t = it.get("type")
if t not in ("qemu", "lxc"): continue
vmid = it.get("vmid"); node = it.get("node")
sid = (("vm" if t=="qemu" else "ct") + f":{vmid}") if vmid is not None else norm_sid(it.get("id"))
if sid and node: idx[sid] = node
return idx
def cluster_vmct_meta() -> Dict[str, Dict[str, Any]]:
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
meta: Dict[str, Dict[str, Any]] = {}
for it in items:
t = it.get("type")
if t not in ("qemu", "lxc"): continue
sid = norm_sid(it.get("id")) or (("vm" if t == "qemu" else "ct") + f":{it.get('vmid')}")
if sid:
meta[sid] = {
"sid": sid, "type": t, "vmid": it.get("vmid"),
"node": it.get("node"), "name": it.get("name"),
"status": it.get("status"), "hastate": it.get("hastate")
}
return meta
def merge_resources(api_res: List[Dict[str, Any]],
parsed_res: List[Dict[str, Any]],
vmct_idx: Dict[str, str]) -> List[Dict[str, Any]]:
by_sid: Dict[str, Dict[str, Any]] = {}
# seed from /cluster/ha/resources
for r in (api_res or []):
sid = norm_sid(r.get("sid"))
if not sid:
continue
x = dict(r)
x["sid"] = sid
by_sid[sid] = x
# merge runtime from ha-manager
for r in (parsed_res or []):
sid = norm_sid(r.get("sid"))
if not sid:
continue
x = by_sid.get(sid, {"sid": sid})
for k, v in r.items():
if k == "sid":
continue
if v not in (None, ""):
x[k] = v
by_sid[sid] = x
# fill node from /cluster/resources
for sid, x in by_sid.items():
if not x.get("node") and sid in vmct_idx:
x["node"] = vmct_idx[sid]
return list(by_sid.values())
# ---------------- VM details ----------------
def sid_to_tuple(sid: str, meta: Dict[str, Dict[str, Any]]) -> Optional[Tuple[str, int, str]]:
sid_n = norm_sid(sid)
if not sid_n: return None
m = re.match(r"^(vm|ct):(\d+)$", sid_n)
if not m: return None
typ = "qemu" if m.group(1) == "vm" else "lxc"
vmid = int(m.group(2))
node = (meta.get(sid_n) or {}).get("node")
return (typ, vmid, node)
def vm_detail_payload(sid: str) -> Dict[str, Any]:
meta = cluster_vmct_meta()
tup = sid_to_tuple(sid, meta)
if not tup: return {"sid": sid, "error": "bad sid"}
typ, vmid, node = tup
if not node: return {"sid": sid, "error": "unknown node"}
base = f"/nodes/{node}/{typ}/{vmid}"
current = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
config = get_json(["pvesh", "get", f"{base}/config"]) or {}
agent_info = None; agent_os = None; agent_ifaces = None
if typ == "qemu":
agent_info = get_json(["pvesh", "get", f"{base}/agent/info"])
agent_os = post_json(["pvesh", "create", f"{base}/agent/get-osinfo"]) or None
agent_ifaces = post_json(["pvesh", "create", f"{base}/agent/network-get-interfaces"]) or None
return {
"sid": norm_sid(sid), "node": node, "type": typ, "vmid": vmid,
"meta": meta.get(norm_sid(sid), {}),
"current": current, "config": config,
"agent": {"info": agent_info, "osinfo": agent_os, "ifaces": agent_ifaces} if typ=="qemu" else None
}
# ---------------- Node details ----------------
def node_detail_payload(name: str) -> Dict[str, Any]:
if not name: return {"error": "no node"}
status = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
version = get_json(["pvesh", "get", f"/nodes/{name}/version"]) or {}
timeinfo = get_json(["pvesh", "get", f"/nodes/{name}/time"]) or {}
services = get_json(["pvesh", "get", f"/nodes/{name}/services"]) or []
network_cfg = get_json(["pvesh", "get", f"/nodes/{name}/network"]) or []
netstat = get_json(["pvesh", "get", f"/nodes/{name}/netstat"]) or [] # may be empty on some versions
disks = get_json(["pvesh", "get", f"/nodes/{name}/disks/list"]) or []
subscription = get_json(["pvesh", "get", f"/nodes/{name}/subscription"]) or {}
return {
"node": name,
"status": status,
"version": version,
"time": timeinfo,
"services": services,
"network_cfg": network_cfg,
"netstat": netstat,
"disks": disks,
"subscription": subscription
}
def node_ha_services(node: str) -> Dict[str, str]:
svcs = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
def one(name: str) -> str:
for s in svcs:
if s.get("name") == name:
st = s.get("state") or s.get("active") or ""
return "active" if str(st).lower() in ("running", "active") else (st or "inactive")
return ""
return {"crm_state": one("pve-ha-crm"), "lrm_state": one("pve-ha-lrm")}
def units_for_node(node: str) -> Dict[str, str]:
wanted = {"watchdog-mux", "pve-ha-crm", "pve-ha-lrm"}
svc = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
states: Dict[str, str] = {}
def norm_state(s: dict) -> str:
raw_active = str(
s.get("active", "") or
s.get("active-state", "") or
s.get("ActiveState", "") or
s.get("activestate", "")
).lower()
status = str(s.get("status", "")).lower()
substate = str(s.get("substate", "")).lower()
state = str(s.get("state", "")).lower()
any_active = (
raw_active in ("active", "running", "1", "true") or
status in ("active", "running") or
substate in ("running", "active") or
("running" in state or "active" in state)
)
return "active" if any_active else "inactive"
for s in svc:
name_raw = (s.get("name") or "")
name = re.sub(r"\.service$", "", name_raw)
if name in wanted:
states[name] = norm_state(s)
for u in wanted:
if states.get(u) != "active" and is_active(u):
states[u] = "active"
for u in wanted:
states.setdefault(u, "inactive")
return states
# ---------------- snapshot ----------------
def status_snapshot(node: str) -> Dict[str, Any]:
vq = votequorum_brief()
api = api_cluster_data()
api["nodes"] = enrich_nodes(api.get("nodes", []))
ha_raw = get_ha_status_raw().strip()
parsed = parse_ha_manager(ha_raw)
vmct_ix = cluster_vmct_index()
ha_status = api.get("ha_status") or []
if not ha_status and parsed.get("nodes"):
ha_status = [{
"node": n.get("node"), "state": n.get("state"),
"crm_state": n.get("crm", ""), "lrm_state": n.get("lrm", "")
} for n in parsed["nodes"]]
if not ha_status:
for it in api.get("cluster_status", []):
if it.get("type") == "node":
ha_status.append({
"node": it.get("name"),
"state": "online" if it.get("online") else "offline",
"crm_state": "", "lrm_state": ""
})
enriched = []
for n in ha_status:
node_name = n.get("node"); crm = n.get("crm_state") or ""; lrm = n.get("lrm_state") or ""
if node_name and (not crm or not lrm):
try:
svc = node_ha_services(node_name)
if not crm: n["crm_state"] = svc.get("crm_state", "")
if not lrm: n["lrm_state"] = svc.get("lrm_state", "")
except Exception:
pass
enriched.append(n)
api["ha_status"] = enriched
api["ha_resources"] = merge_resources(api.get("ha_resources", []), parsed.get("resources", []), vmct_ix)
units = units_for_node(node or socket.gethostname())
return {
"node_arg": node, "hostname": socket.gethostname(),
"votequorum": vq, "units": units,
"cfgtool": get_cfgtool().strip(), "pvecm": get_pvecm_status().strip(),
"ha_raw": ha_raw, "replication": get_pvesr_status().strip(),
"api": api, "ts": int(time.time())
}
# ---------------- web ----------------
@app.get("/")
def index():
node = request.args.get("node", DEFAULT_NODE)
return render_template("index.html", title=APP_TITLE, node=node)
@app.get("/api/info")
def api_info():
node = request.args.get("node", DEFAULT_NODE)
return jsonify(status_snapshot(node))
@app.get("/api/vm")
def api_vm_detail():
sid = request.args.get("sid", "")
return jsonify(vm_detail_payload(sid))
@app.get("/api/node")
def api_node_detail():
name = request.args.get("name", "")
return jsonify(node_detail_payload(name))
@app.get("/api/list-vmct")
def api_list_vmct():
meta = cluster_vmct_meta()
ha_sids = {norm_sid(r.get("sid")) for r in (api_cluster_data().get("ha_resources") or []) if r.get("sid")}
nonha = [v for k, v in meta.items() if k not in ha_sids]
return jsonify({
"nonha": nonha, "ha_index": list(ha_sids),
"count_nonha": len(nonha), "count_all_vmct": len(meta)
})
@app.post("/api/enable")
def api_enable():
if os.geteuid() != 0:
return jsonify(ok=False, error="run as root"), 403
data = request.get_json(force=True, silent=True) or {}
node = data.get("node") or DEFAULT_NODE
log: List[str] = []
ha_node_maint(True, node, log)
for u in HA_UNITS_STOP: stop_if_running(u, log)
return jsonify(ok=True, log=log)
@app.post("/api/disable")
def api_disable():
if os.geteuid() != 0:
return jsonify(ok=False, error="run as root"), 403
data = request.get_json(force=True, silent=True) or {}
node = data.get("node") or DEFAULT_NODE
log: List[str] = []
for u in HA_UNITS_START: start_if_needed(u, log)
ha_node_maint(False, node, log)
return jsonify(ok=True, log=log)
# --- VM/CT admin actions API ---
def vm_locked(typ: str, node: str, vmid: int) -> bool:
base = f"/nodes/{node}/{typ}/{vmid}"
cur = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
return bool(cur.get("lock"))
def run_qm_pct(typ: str, vmid: int, subcmd: str) -> subprocess.CompletedProcess:
tool = "qm" if typ == "qemu" else "pct"
return run([tool, subcmd, str(vmid)])
@app.get("/api/list-all-vmct")
def api_list_all_vmct():
meta = cluster_vmct_meta()
nodes = [n.get("node") for n in (api_cluster_data().get("nodes") or []) if n.get("node")]
return jsonify({"all": list(meta.values()), "nodes": sorted(set(nodes))})
@app.post("/api/vm-action")
def api_vm_action():
if os.geteuid() != 0:
return jsonify(ok=False, error="run as root"), 403
data = request.get_json(force=True, silent=True) or {}
sid = data.get("sid", "")
action = data.get("action", "")
target = data.get("target", "")
meta = cluster_vmct_meta()
tup = sid_to_tuple(sid, meta)
if not tup:
return jsonify(ok=False, error="bad sid"), 400
typ, vmid, node = tup
if not node:
return jsonify(ok=False, error="unknown node"), 400
if action == "migrate_offline":
action = "migrate"
try:
if action == "unlock":
if typ != "qemu":
return jsonify(ok=False, error="unlock only for qemu"), 400
if not vm_locked(typ, node, vmid):
return jsonify(ok=True, msg="not locked")
r = run(["qm", "unlock", str(vmid)])
return jsonify(ok=(r.returncode == 0), stdout=r.stdout, stderr=r.stderr)
elif action in ("start", "stop", "shutdown"):
r = run_qm_pct(typ, vmid, action)
return jsonify(ok=(r.returncode == 0), stdout=r.stdout, stderr=r.stderr)
elif action == "migrate":
if not target or target == node:
return jsonify(ok=False, error="target required and must differ from source"), 400
base = f"/nodes/{node}/{typ}/{vmid}/migrate"
cmd = ["pvesh", "create", base, "-target", target, "-online", "0"]
res = post_json(cmd)
return jsonify(ok=True, result=res or {})
else:
return jsonify(ok=False, error="unknown action"), 400
except Exception as e:
return jsonify(ok=False, error=str(e)), 500
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
p.add_argument("--bind", default="127.0.0.1:8007", help="addr:port")
p.add_argument("--node", default=DEFAULT_NODE, help="default node")
args = p.parse_args()
DEFAULT_NODE = args.node
host, port = args.bind.split(":")
app.run(host=host, port=int(port), debug=False, threaded=True)