583 lines
24 KiB
Python
583 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
# /opt/pve-ha-web/app.py
|
|
import os
|
|
import re
|
|
import shlex
|
|
import socket
|
|
import json
|
|
import time
|
|
import subprocess
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from flask import Flask, request, jsonify, render_template
|
|
|
|
# NEW: WebSockets
|
|
from flask_sock import Sock
|
|
|
|
APP_TITLE = "PVE HA Panel"
|
|
DEFAULT_NODE = socket.gethostname()
|
|
HA_UNITS_START = ["watchdog-mux", "pve-ha-crm", "pve-ha-lrm"]
|
|
HA_UNITS_STOP = list(reversed(HA_UNITS_START))
|
|
|
|
app = Flask(__name__, template_folder="templates", static_folder="static")
|
|
sock = Sock(app) # NEW
|
|
|
|
# ---------------- exec helpers ----------------
|
|
def run(cmd: List[str], timeout: int = 25) -> subprocess.CompletedProcess:
|
|
return subprocess.run(cmd, check=False, text=True, capture_output=True, timeout=timeout)
|
|
|
|
def get_text(cmd: List[str], timeout: Optional[int] = None) -> str:
|
|
r = run(cmd, timeout=timeout or 25)
|
|
return r.stdout if r.returncode == 0 else ""
|
|
|
|
def get_json(cmd: List[str], timeout: Optional[int] = None) -> Any:
|
|
if cmd and cmd[0] == "pvesh" and "--output-format" not in cmd:
|
|
cmd = cmd + ["--output-format", "json"]
|
|
r = run(cmd, timeout=timeout or 25)
|
|
if r.returncode != 0 or not r.stdout.strip():
|
|
return None
|
|
try:
|
|
return json.loads(r.stdout)
|
|
except Exception:
|
|
return None
|
|
|
|
def post_json(cmd: List[str], timeout: Optional[int] = None) -> Any:
|
|
# force "create" for POST-like agent calls
|
|
if cmd and cmd[0] == "pvesh" and len(cmd) > 2 and cmd[1] != "create":
|
|
cmd = ["pvesh", "create"] + cmd[1:]
|
|
r = run(cmd, timeout=timeout or 25)
|
|
if r.returncode != 0:
|
|
# pvesh create zwykle zwraca JSON tylko przy 200
|
|
try:
|
|
return json.loads(r.stdout) if r.stdout.strip() else {"error": r.stderr.strip(), "rc": r.returncode}
|
|
except Exception:
|
|
return {"error": r.stderr.strip(), "rc": r.returncode}
|
|
try:
|
|
return json.loads(r.stdout) if r.stdout.strip() else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
def is_active(unit: str) -> bool:
|
|
return run(["systemctl", "is-active", "--quiet", unit]).returncode == 0
|
|
|
|
def start_if_needed(unit: str, out: List[str]) -> None:
|
|
if not is_active(unit):
|
|
out.append(f"+ start {unit}")
|
|
run(["systemctl", "start", unit])
|
|
|
|
def stop_if_running(unit: str, out: List[str]) -> None:
|
|
if is_active(unit):
|
|
out.append(f"- stop {unit}")
|
|
run(["systemctl", "stop", unit])
|
|
|
|
def ha_node_maint(enable: bool, node: str, out: List[str]) -> None:
|
|
cmd = ["ha-manager", "crm-command", "node-maintenance", "enable" if enable else "disable", node]
|
|
out.append("$ " + " ".join(shlex.quote(x) for x in cmd))
|
|
r = run(cmd, timeout=25)
|
|
if r.returncode != 0:
|
|
out.append(f"ERR: {r.stderr.strip()}")
|
|
|
|
# ---------------- collectors ----------------
|
|
def get_pvecm_status() -> str: return get_text(["pvecm", "status"])
|
|
def get_quorumtool(short: bool = True) -> str: return get_text(["corosync-quorumtool", "-s" if short else "-l"])
|
|
def get_cfgtool() -> str: return get_text(["corosync-cfgtool", "-s"])
|
|
def get_ha_status_raw() -> str: return get_text(["ha-manager", "status"])
|
|
def get_pvesr_status() -> str: return get_text(["pvesr", "status"])
|
|
|
|
def votequorum_brief() -> Dict[str, Any]:
|
|
out = get_quorumtool(True)
|
|
rv: Dict[str, Any] = {}
|
|
rx = {
|
|
"expected": r"Expected votes:\s*(\d+)",
|
|
"total": r"Total votes:\s*(\d+)",
|
|
"quorum": r"Quorum:\s*(\d+)",
|
|
"quorate": r"Quorate:\s*(Yes|No)"
|
|
}
|
|
for k, rgx in rx.items():
|
|
m = re.search(rgx, out, re.I)
|
|
rv[k] = (None if not m else (m.group(1).lower() if k == "quorate" else int(m.group(1))))
|
|
out_l = get_quorumtool(False)
|
|
lines = [ln for ln in out_l.splitlines() if re.match(r"^\s*\d+\s+\d+\s+\S+", ln)]
|
|
rv["members"] = len(lines) if lines else None
|
|
return rv
|
|
|
|
def api_cluster_data() -> Dict[str, Any]:
|
|
return {
|
|
"cluster_status": get_json(["pvesh", "get", "/cluster/status"]) or [],
|
|
"ha_status": get_json(["pvesh", "get", "/cluster/ha/status"]) or [],
|
|
"ha_resources": get_json(["pvesh", "get", "/cluster/ha/resources"]) or [],
|
|
"ha_groups": get_json(["pvesh", "get", "/cluster/ha/groups"]) or [],
|
|
"nodes": get_json(["pvesh", "get", "/nodes"]) or [],
|
|
}
|
|
|
|
def enrich_nodes(nodes_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
out: List[Dict[str, Any]] = []
|
|
for n in nodes_list or []:
|
|
name = n.get("node")
|
|
if not name:
|
|
out.append(n); continue
|
|
detail = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
|
|
if "loadavg" in detail: n["loadavg"] = detail["loadavg"]
|
|
if "cpu" in detail: n["cpu"] = detail["cpu"]
|
|
if "memory" in detail:
|
|
n["mem"] = detail["memory"].get("used"); n["maxmem"] = detail["memory"].get("total")
|
|
if "rootfs" in detail:
|
|
n["rootfs"] = detail["rootfs"].get("used"); n["maxrootfs"] = detail["rootfs"].get("total")
|
|
out.append(n)
|
|
return out
|
|
|
|
# ---------------- ha-manager parser ----------------
|
|
def parse_ha_manager(text: str) -> Dict[str, Any]:
|
|
nodes: Dict[str, Dict[str, str]] = {}
|
|
resources: Dict[str, Dict[str, str]] = {}
|
|
current_node: Optional[str] = None
|
|
for line in (text or "").splitlines():
|
|
s = line.strip()
|
|
m = re.match(r"node:\s+(\S+)\s+\(([^)]+)\)", s)
|
|
if m:
|
|
current_node = m.group(1)
|
|
nodes.setdefault(current_node, {"node": current_node, "state": m.group(2)})
|
|
continue
|
|
m = re.match(r"(lrm|crm)\s+status:\s+(\S+)", s)
|
|
if m and current_node:
|
|
nodes[current_node]["lrm" if m.group(1)=="lrm" else "crm"] = m.group(2)
|
|
continue
|
|
m = re.match(r"service:\s+(\S+)\s+on\s+(\S+)\s+\(([^)]+)\)", s)
|
|
if m:
|
|
sid, node, flags = m.group(1), m.group(2), m.group(3)
|
|
rec = {"sid": sid, "node": node, "flags": flags}
|
|
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
|
|
resources[sid] = rec
|
|
continue
|
|
m = re.match(r"service:\s+(\S+)\s+\(([^)]*)\)\s+on\s+(\S+)", s)
|
|
if m:
|
|
sid, flags, node = m.group(1), m.group(2), m.group(3)
|
|
rec = {"sid": sid, "node": node, "flags": flags}
|
|
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
|
|
resources[sid] = rec
|
|
continue
|
|
m = re.match(r"service\s+(\S+):\s+(\S+)\s+on\s+(\S+)", s)
|
|
if m:
|
|
sid, st, node = m.group(1), m.group(2), m.group(3)
|
|
resources[sid] = {"sid": sid, "state": st, "node": node}
|
|
continue
|
|
return {"nodes": list(nodes.values()), "resources": list(resources.values())}
|
|
|
|
# ---------------- SID utils / indexes ----------------
|
|
def norm_sid(s: Optional[str]) -> Optional[str]:
|
|
if not s: return None
|
|
s = str(s)
|
|
m = re.match(r"^(vm|ct):(\d+)$", s)
|
|
if m: return f"{m.group(1)}:{m.group(2)}"
|
|
m = re.match(r"^(qemu|lxc)/(\d+)$", s)
|
|
if m: return ("vm" if m.group(1) == "qemu" else "ct") + f":{m.group(2)}"
|
|
return s
|
|
|
|
def cluster_vmct_index() -> Dict[str, str]:
|
|
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
|
|
idx: Dict[str, str] = {}
|
|
for it in items:
|
|
t = it.get("type")
|
|
if t not in ("qemu", "lxc"): continue
|
|
vmid = it.get("vmid"); node = it.get("node")
|
|
sid = (("vm" if t=="qemu" else "ct") + f":{vmid}") if vmid is not None else norm_sid(it.get("id"))
|
|
if sid and node: idx[sid] = node
|
|
return idx
|
|
|
|
def cluster_vmct_meta() -> Dict[str, Dict[str, Any]]:
|
|
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
|
|
meta: Dict[str, Dict[str, Any]] = {}
|
|
for it in items:
|
|
t = it.get("type")
|
|
if t not in ("qemu", "lxc"): continue
|
|
sid = norm_sid(it.get("id")) or (("vm" if t == "qemu" else "ct") + f":{it.get('vmid')}")
|
|
if sid:
|
|
meta[sid] = {
|
|
"sid": sid, "type": t, "vmid": it.get("vmid"),
|
|
"node": it.get("node"), "name": it.get("name"),
|
|
"status": it.get("status"), "hastate": it.get("hastate")
|
|
}
|
|
return meta
|
|
|
|
def merge_resources(api_res: List[Dict[str, Any]],
|
|
parsed_res: List[Dict[str, Any]],
|
|
vmct_idx: Dict[str, str]) -> List[Dict[str, Any]]:
|
|
by_sid: Dict[str, Dict[str, Any]] = {}
|
|
for r in (api_res or []):
|
|
sid = norm_sid(r.get("sid"))
|
|
if not sid: continue
|
|
x = dict(r); x["sid"] = sid; by_sid[sid] = x
|
|
for r in (parsed_res or []):
|
|
sid = norm_sid(r.get("sid"))
|
|
if not sid: continue
|
|
x = by_sid.get(sid, {"sid": sid})
|
|
for k, v in r.items():
|
|
if k == "sid": continue
|
|
if v not in (None, ""): x[k] = v
|
|
by_sid[sid] = x
|
|
for sid, x in by_sid.items():
|
|
if not x.get("node") and sid in vmct_idx:
|
|
x["node"] = vmct_idx[sid]
|
|
return list(by_sid.values())
|
|
|
|
# ---------------- VM details ----------------
|
|
def sid_to_tuple(sid: str, meta: Dict[str, Dict[str, Any]]) -> Optional[Tuple[str, int, str]]:
|
|
sid_n = norm_sid(sid)
|
|
if not sid_n: return None
|
|
m = re.match(r"^(vm|ct):(\d+)$", sid_n)
|
|
if not m: return None
|
|
typ = "qemu" if m.group(1) == "vm" else "lxc"
|
|
vmid = int(m.group(2))
|
|
node = (meta.get(sid_n) or {}).get("node")
|
|
return (typ, vmid, node)
|
|
|
|
def vm_detail_payload(sid: str) -> Dict[str, Any]:
|
|
meta = cluster_vmct_meta()
|
|
tup = sid_to_tuple(sid, meta)
|
|
if not tup: return {"sid": sid, "error": "bad sid"}
|
|
typ, vmid, node = tup
|
|
if not node: return {"sid": sid, "error": "unknown node"}
|
|
base = f"/nodes/{node}/{typ}/{vmid}"
|
|
current = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
|
|
config = get_json(["pvesh", "get", f"{base}/config"]) or {}
|
|
agent_info = None; agent_os = None; agent_ifaces = None
|
|
if typ == "qemu":
|
|
agent_info = get_json(["pvesh", "get", f"{base}/agent/info"])
|
|
agent_os = post_json(["pvesh", "create", f"{base}/agent/get-osinfo"]) or None
|
|
agent_ifaces = post_json(["pvesh", "create", f"{base}/agent/network-get-interfaces"]) or None
|
|
return {
|
|
"sid": norm_sid(sid), "node": node, "type": typ, "vmid": vmid,
|
|
"meta": meta.get(norm_sid(sid), {}),
|
|
"current": current, "config": config,
|
|
"agent": {"info": agent_info, "osinfo": agent_os, "ifaces": agent_ifaces} if typ=="qemu" else None
|
|
}
|
|
|
|
# ---------------- Node details ----------------
|
|
def node_detail_payload(name: str) -> Dict[str, Any]:
|
|
if not name: return {"error": "no node"}
|
|
status = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
|
|
version = get_json(["pvesh", "get", f"/nodes/{name}/version"]) or {}
|
|
timeinfo = get_json(["pvesh", "get", f"/nodes/{name}/time"]) or {}
|
|
services = get_json(["pvesh", "get", f"/nodes/{name}/services"]) or []
|
|
network_cfg = get_json(["pvesh", "get", f"/nodes/{name}/network"]) or []
|
|
netstat = get_json(["pvesh", "get", f"/nodes/{name}/netstat"]) or []
|
|
disks = get_json(["pvesh", "get", f"/nodes/{name}/disks/list"]) or []
|
|
subscription = get_json(["pvesh", "get", f"/nodes/{name}/subscription"]) or {}
|
|
return {"node": name,"status": status,"version": version,"time": timeinfo,"services": services,
|
|
"network_cfg": network_cfg,"netstat": netstat,"disks": disks,"subscription": subscription}
|
|
|
|
def node_ha_services(node: str) -> Dict[str, str]:
|
|
svcs = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
|
|
def one(name: str) -> str:
|
|
for s in svcs:
|
|
if s.get("name") == name:
|
|
st = s.get("state") or s.get("active") or ""
|
|
return "active" if str(st).lower() in ("running", "active") else (st or "inactive")
|
|
return ""
|
|
return {"crm_state": one("pve-ha-crm"), "lrm_state": one("pve-ha-lrm")}
|
|
|
|
def units_for_node(node: str) -> Dict[str, str]:
|
|
wanted = {"watchdog-mux", "pve-ha-crm", "pve-ha-lrm"}
|
|
svc = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
|
|
states: Dict[str, str] = {}
|
|
def norm_state(s: dict) -> str:
|
|
raw_active = str((s.get("active","") or s.get("active-state","") or s.get("ActiveState","") or s.get("activestate",""))).lower()
|
|
status = str(s.get("status","")).lower()
|
|
substate = str(s.get("substate","")).lower()
|
|
state = str(s.get("state","")).lower()
|
|
any_active = (raw_active in ("active","running","1","true") or status in ("active","running") or substate in ("running","active") or ("running" in state or "active" in state))
|
|
return "active" if any_active else "inactive"
|
|
for s in svc:
|
|
name_raw = (s.get("name") or ""); name = re.sub(r"\.service$","",name_raw)
|
|
if name in wanted: states[name] = norm_state(s)
|
|
for u in wanted:
|
|
if states.get(u) != "active" and is_active(u):
|
|
states[u] = "active"
|
|
for u in wanted: states.setdefault(u, "inactive")
|
|
return states
|
|
|
|
# ---------------- snapshot ----------------
|
|
def status_snapshot(node: str) -> Dict[str, Any]:
|
|
vq = votequorum_brief()
|
|
api = api_cluster_data()
|
|
api["nodes"] = enrich_nodes(api.get("nodes", []))
|
|
ha_raw = get_ha_status_raw().strip()
|
|
parsed = parse_ha_manager(ha_raw)
|
|
vmct_ix = cluster_vmct_index()
|
|
|
|
ha_status = api.get("ha_status") or []
|
|
if not ha_status and parsed.get("nodes"):
|
|
ha_status = [{
|
|
"node": n.get("node"), "state": n.get("state"),
|
|
"crm_state": n.get("crm",""), "lrm_state": n.get("lrm","")
|
|
} for n in parsed["nodes"]]
|
|
if not ha_status:
|
|
for it in api.get("cluster_status", []):
|
|
if it.get("type") == "node":
|
|
ha_status.append({"node": it.get("name"),"state": "online" if it.get("online") else "offline","crm_state":"", "lrm_state":""})
|
|
|
|
enriched = []
|
|
for n in ha_status:
|
|
node_name = n.get("node"); crm = n.get("crm_state") or ""; lrm = n.get("lrm_state") or ""
|
|
if node_name and (not crm or not lrm):
|
|
try:
|
|
svc = node_ha_services(node_name)
|
|
if not crm: n["crm_state"] = svc.get("crm_state","")
|
|
if not lrm: n["lrm_state"] = svc.get("lrm_state","")
|
|
except Exception:
|
|
pass
|
|
enriched.append(n)
|
|
api["ha_status"] = enriched
|
|
api["ha_resources"] = merge_resources(api.get("ha_resources", []), parsed.get("resources", []), vmct_ix)
|
|
units = units_for_node(node or socket.gethostname())
|
|
return {"node_arg": node, "hostname": socket.gethostname(), "votequorum": vq, "units": units,
|
|
"cfgtool": get_cfgtool().strip(), "pvecm": get_pvecm_status().strip(),
|
|
"ha_raw": ha_raw, "replication": get_pvesr_status().strip(), "api": api, "ts": int(time.time())}
|
|
|
|
# ---------------- web ----------------
|
|
@app.get("/")
|
|
def index():
|
|
node = request.args.get("node", DEFAULT_NODE)
|
|
return render_template("index.html", title=APP_TITLE, node=node)
|
|
|
|
# Stary zbiorczy snapshot — zostaje
|
|
@app.get("/api/info")
|
|
def api_info():
|
|
node = request.args.get("node", DEFAULT_NODE)
|
|
return jsonify(status_snapshot(node))
|
|
|
|
# --- NOWE lżejsze endpointy (szybsze ładowanie strony) ---
|
|
|
|
@app.get("/api/cluster")
|
|
def api_cluster_brief():
|
|
vq = votequorum_brief()
|
|
api = api_cluster_data()
|
|
ha_raw = get_ha_status_raw().strip()
|
|
parsed = parse_ha_manager(ha_raw)
|
|
vmct_ix = cluster_vmct_index()
|
|
api["ha_resources"] = merge_resources(api.get("ha_resources", []), parsed.get("resources", []), vmct_ix)
|
|
return jsonify({
|
|
"votequorum": vq,
|
|
"cluster_status": api.get("cluster_status", []),
|
|
"ha_resources": api.get("ha_resources", []),
|
|
"pvecm": get_pvecm_status().strip(),
|
|
"cfgtool": get_cfgtool().strip(),
|
|
"hostname": socket.gethostname(),
|
|
"ts": int(time.time())
|
|
})
|
|
|
|
@app.get("/api/nodes/summary")
|
|
def api_nodes_summary():
|
|
nodes = enrich_nodes((api_cluster_data().get("nodes") or []))
|
|
return jsonify({ "nodes": nodes })
|
|
|
|
@app.get("/api/units")
|
|
def api_units():
|
|
node = request.args.get("node", DEFAULT_NODE)
|
|
return jsonify({ "units": units_for_node(node) })
|
|
|
|
# Replication ze wszystkich nodów
|
|
@app.get("/api/replication/all")
|
|
def api_replication_all():
|
|
jobs: List[Dict[str, Any]] = []
|
|
nodes = [n.get("node") for n in (api_cluster_data().get("nodes") or []) if n.get("node")]
|
|
for name in nodes:
|
|
data = get_json(["pvesh", "get", f"/nodes/{name}/replication"]) or get_json(["pvesh","get",f"/nodes/{name}/replication/jobs"]) or []
|
|
for it in (data or []):
|
|
jobs.append({
|
|
"node": name,
|
|
"job": it.get("id") or it.get("job") or "",
|
|
"enabled": "yes" if (it.get("enable",1) in (1, "1", True)) else "no",
|
|
"target": it.get("target") or it.get("target-node") or "",
|
|
"last": it.get("last_sync") or it.get("last_sync", ""),
|
|
"next": it.get("next_sync") or it.get("next_sync", ""),
|
|
"dur": it.get("duration") or it.get("duration", ""),
|
|
"fail": int(it.get("fail_count") or it.get("failcount") or 0),
|
|
"state": it.get("state") or it.get("status") or ""
|
|
})
|
|
return jsonify({ "jobs": jobs })
|
|
|
|
# --- Detale ---
|
|
@app.get("/api/vm")
|
|
def api_vm_detail():
|
|
sid = request.args.get("sid", "")
|
|
return jsonify(vm_detail_payload(sid))
|
|
|
|
@app.get("/api/node")
|
|
def api_node_detail():
|
|
name = request.args.get("name", "")
|
|
return jsonify(node_detail_payload(name))
|
|
|
|
@app.get("/api/list-vmct")
|
|
def api_list_vmct():
|
|
meta = cluster_vmct_meta()
|
|
ha_sids = {norm_sid(r.get("sid")) for r in (api_cluster_data().get("ha_resources") or []) if r.get("sid")}
|
|
nonha = [v for k, v in meta.items() if k not in ha_sids]
|
|
return jsonify({"nonha": nonha, "ha_index": list(ha_sids), "count_nonha": len(nonha), "count_all_vmct": len(meta)})
|
|
|
|
# --- Enable/Disable maintenance ---
|
|
@app.post("/api/enable")
|
|
def api_enable():
|
|
if os.geteuid() != 0:
|
|
return jsonify(ok=False, error="run as root"), 403
|
|
data = request.get_json(force=True, silent=True) or {}
|
|
node = data.get("node") or DEFAULT_NODE
|
|
log: List[str] = []
|
|
ha_node_maint(True, node, log)
|
|
for u in HA_UNITS_STOP: stop_if_running(u, log)
|
|
return jsonify(ok=True, log=log)
|
|
|
|
@app.post("/api/disable")
|
|
def api_disable():
|
|
if os.geteuid() != 0:
|
|
return jsonify(ok=False, error="run as root"), 403
|
|
data = request.get_json(force=True, silent=True) or {}
|
|
node = data.get("node") or DEFAULT_NODE
|
|
log: List[str] = []
|
|
for u in HA_UNITS_START: start_if_needed(u, log)
|
|
ha_node_maint(False, node, log)
|
|
return jsonify(ok=True, log=log)
|
|
|
|
# --- VM/CT admin actions API (zwrot UPID dla live) ---
|
|
|
|
def vm_locked(typ: str, node: str, vmid: int) -> bool:
|
|
base = f"/nodes/{node}/{typ}/{vmid}"
|
|
cur = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
|
|
return bool(cur.get("lock"))
|
|
|
|
def run_qm_pct(typ: str, vmid: int, subcmd: str) -> subprocess.CompletedProcess:
|
|
tool = "qm" if typ == "qemu" else "pct"
|
|
return run([tool, subcmd, str(vmid)], timeout=120)
|
|
|
|
@app.get("/api/list-all-vmct")
|
|
def api_list_all_vmct():
|
|
meta = cluster_vmct_meta()
|
|
nodes = [n.get("node") for n in (api_cluster_data().get("nodes") or []) if n.get("node")]
|
|
return jsonify({"all": list(meta.values()), "nodes": sorted(set(nodes))})
|
|
|
|
@app.post("/api/vm-action")
|
|
def api_vm_action():
|
|
if os.geteuid() != 0:
|
|
return jsonify(ok=False, error="run as root"), 403
|
|
data = request.get_json(force=True, silent=True) or {}
|
|
sid = data.get("sid", "")
|
|
action = data.get("action", "")
|
|
target = data.get("target", "")
|
|
|
|
meta = cluster_vmct_meta()
|
|
tup = sid_to_tuple(sid, meta)
|
|
if not tup:
|
|
return jsonify(ok=False, error="bad sid"), 400
|
|
typ, vmid, node = tup
|
|
if not node:
|
|
return jsonify(ok=False, error="unknown node"), 400
|
|
|
|
try:
|
|
if action == "unlock":
|
|
# brak UPID (działa natychmiast)
|
|
if typ != "qemu":
|
|
return jsonify(ok=False, error="unlock only for qemu"), 400
|
|
if not vm_locked(typ, node, vmid):
|
|
return jsonify(ok=True, msg="not locked")
|
|
r = run(["qm", "unlock", str(vmid)])
|
|
return jsonify(ok=(r.returncode == 0), stdout=r.stdout, stderr=r.stderr, upid=None, source_node=node)
|
|
|
|
elif action in ("start", "stop", "shutdown"):
|
|
# PVE API -> zwraca UPID
|
|
base = f"/nodes/{node}/{typ}/{vmid}/status/{action}"
|
|
res = post_json(["pvesh", "create", base], timeout=120) or {}
|
|
upid = None
|
|
if isinstance(res, dict):
|
|
upid = res.get("data") or res.get("upid")
|
|
if not upid:
|
|
# fallback do qm/pct (bez UPID), ale zwrócimy ok
|
|
r = run_qm_pct(typ, vmid, action)
|
|
return jsonify(ok=(r.returncode == 0), stdout=r.stdout, stderr=r.stderr, upid=None, source_node=node)
|
|
return jsonify(ok=True, result=res, upid=upid, source_node=node)
|
|
|
|
elif action == "migrate" or action == "migrate_offline":
|
|
if action == "migrate_offline": # zgodność wstecz
|
|
action = "migrate"
|
|
if not target or target == node:
|
|
return jsonify(ok=False, error="target required and must differ from source"), 400
|
|
base = f"/nodes/{node}/{typ}/{vmid}/migrate"
|
|
res = post_json(["pvesh", "create", base, "-target", target, "-online", "0"], timeout=120) or {}
|
|
upid = None
|
|
if isinstance(res, dict):
|
|
upid = res.get("data") or res.get("upid")
|
|
return jsonify(ok=True, result=res, upid=upid, source_node=node)
|
|
|
|
else:
|
|
return jsonify(ok=False, error="unknown action"), 400
|
|
|
|
except Exception as e:
|
|
return jsonify(ok=False, error=str(e)), 500
|
|
|
|
|
|
# ---------------- WebSocket: broadcast observe per sid ----------------
|
|
@sock.route("/ws/observe")
|
|
def ws_observe(ws):
|
|
q = ws.environ.get("QUERY_STRING", "")
|
|
params = {}
|
|
for part in q.split("&"):
|
|
if not part: continue
|
|
k, _, v = part.partition("="); params[k] = v
|
|
sid_raw = (params.get("sid") or "").strip()
|
|
sid = norm_sid(sid_raw)
|
|
if not sid:
|
|
ws.send(json.dumps({"type":"error","error":"sid required"})); return
|
|
|
|
def resolve_tuple():
|
|
meta = cluster_vmct_meta()
|
|
return sid_to_tuple(sid, meta)
|
|
|
|
tup = resolve_tuple()
|
|
if not tup:
|
|
ws.send(json.dumps({"type":"error","error":"unknown sid"})); return
|
|
typ, vmid, node = tup
|
|
if not node:
|
|
ws.send(json.dumps({"type":"error","error":"could not resolve node"})); return
|
|
|
|
seen_upids = set()
|
|
prev_node = node
|
|
|
|
try:
|
|
while True:
|
|
ntup = resolve_tuple()
|
|
if ntup:
|
|
_, _, cur_node = ntup
|
|
if cur_node and cur_node != node:
|
|
ws.send(json.dumps({"type":"moved","old_node":node,"new_node":cur_node,"meta":{"sid":sid,"vmid":vmid,"typ":typ}}))
|
|
prev_node, node = node, cur_node
|
|
|
|
nodes_to_scan = [node] + ([prev_node] if prev_node and prev_node != node else [])
|
|
for nX in nodes_to_scan:
|
|
tasks = get_json(["pvesh","get",f"/nodes/{nX}/tasks","-limit","50"]) or []
|
|
for t in tasks:
|
|
upid = t.get("upid") if isinstance(t, dict) else None
|
|
tid = (t.get("id") or "") if isinstance(t, dict) else ""
|
|
if not upid or not isinstance(upid, str): continue
|
|
if (str(vmid) in tid) or (f"{'qemu' if typ=='qemu' else 'lxc'}/{vmid}" in tid):
|
|
st = get_json(["pvesh","get",f"/nodes/{nX}/tasks/{upid}/status"]) or {}
|
|
ws.send(json.dumps({"type":"task","upid":upid,"node":nX}))
|
|
if upid not in seen_upids and str(st.get("status","")).lower() != "stopped":
|
|
seen_upids.add(upid)
|
|
ws.send(json.dumps({"type":"task-start","upid":upid,"node":nX}))
|
|
if str(st.get("status","")).lower() == "stopped" or st.get("exitstatus"):
|
|
ok = str(st.get("exitstatus","")).upper() == "OK"
|
|
ws.send(json.dumps({"type":"done","upid":upid,"ok":ok,"node":nX}))
|
|
time.sleep(1.8)
|
|
except Exception:
|
|
try: ws.close()
|
|
except Exception: pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--bind", default="127.0.0.1:8007", help="addr:port")
|
|
p.add_argument("--node", default=DEFAULT_NODE, help="default node")
|
|
args = p.parse_args()
|
|
DEFAULT_NODE = args.node
|
|
host, port = args.bind.split(":")
|
|
app.run(host=host, port=int(port), debug=False, threaded=True)
|