Files
pve-ha-web/app.py
Mateusz Gruszczyński ff4944ffb4 refator_comm1
2025-10-18 21:18:36 +02:00

693 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# /opt/pve-ha-web/app.py
import os
import re
import shlex
import socket
import json
import time
import subprocess
from typing import List, Dict, Any, Optional, Tuple
from flask import Flask, request, jsonify, render_template
# NEW: WebSockets
from flask_sock import Sock
APP_TITLE = "PVE HA Panel"
DEFAULT_NODE = socket.gethostname()
HA_UNITS_START = ["watchdog-mux", "pve-ha-crm", "pve-ha-lrm"]
HA_UNITS_STOP = list(reversed(HA_UNITS_START))
app = Flask(__name__, template_folder="templates", static_folder="static")
sock = Sock(app) # NEW
# ---------------- exec helpers ----------------
def run(cmd: List[str], timeout: int = 25) -> subprocess.CompletedProcess:
return subprocess.run(cmd, check=False, text=True, capture_output=True, timeout=timeout)
def get_text(cmd: List[str], timeout: Optional[int] = None) -> str:
r = run(cmd, timeout=timeout or 25)
return r.stdout if r.returncode == 0 else ""
def get_json(cmd: List[str], timeout: Optional[int] = None) -> Any:
if cmd and cmd[0] == "pvesh" and "--output-format" not in cmd:
cmd = cmd + ["--output-format", "json"]
r = run(cmd, timeout=timeout or 25)
if r.returncode != 0 or not r.stdout.strip():
return None
try:
return json.loads(r.stdout)
except Exception:
return None
def post_json(cmd: List[str], timeout: Optional[int] = None) -> Any:
# force "create" for POST-like agent calls
if cmd and cmd[0] == "pvesh" and len(cmd) > 2 and cmd[1] != "create":
cmd = ["pvesh", "create"] + cmd[1:]
r = run(cmd, timeout=timeout or 25)
if r.returncode != 0:
# pvesh create zwykle zwraca JSON tylko przy 200
try:
return json.loads(r.stdout) if r.stdout.strip() else {"error": r.stderr.strip(), "rc": r.returncode}
except Exception:
return {"error": r.stderr.strip(), "rc": r.returncode}
try:
return json.loads(r.stdout) if r.stdout.strip() else {}
except Exception:
return {}
def is_active(unit: str) -> bool:
return run(["systemctl", "is-active", "--quiet", unit]).returncode == 0
def start_if_needed(unit: str, out: List[str]) -> None:
if not is_active(unit):
out.append(f"+ start {unit}")
run(["systemctl", "start", unit])
def stop_if_running(unit: str, out: List[str]) -> None:
if is_active(unit):
out.append(f"- stop {unit}")
run(["systemctl", "stop", unit])
def ha_node_maint(enable: bool, node: str, out: List[str]) -> None:
cmd = ["ha-manager", "crm-command", "node-maintenance", "enable" if enable else "disable", node]
out.append("$ " + " ".join(shlex.quote(x) for x in cmd))
r = run(cmd, timeout=25)
if r.returncode != 0:
out.append(f"ERR: {r.stderr.strip()}")
# ---------------- collectors ----------------
def get_pvecm_status() -> str: return get_text(["pvecm", "status"])
def get_quorumtool(short: bool = True) -> str: return get_text(["corosync-quorumtool", "-s" if short else "-l"])
def get_cfgtool() -> str: return get_text(["corosync-cfgtool", "-s"])
def get_ha_status_raw() -> str: return get_text(["ha-manager", "status"])
def get_pvesr_status() -> str: return get_text(["pvesr", "status"])
def votequorum_brief() -> Dict[str, Any]:
out = get_quorumtool(True)
rv: Dict[str, Any] = {}
rx = {
"expected": r"Expected votes:\s*(\d+)",
"total": r"Total votes:\s*(\d+)",
"quorum": r"Quorum:\s*(\d+)",
"quorate": r"Quorate:\s*(Yes|No)"
}
for k, rgx in rx.items():
m = re.search(rgx, out, re.I)
rv[k] = (None if not m else (m.group(1).lower() if k == "quorate" else int(m.group(1))))
out_l = get_quorumtool(False)
lines = [ln for ln in out_l.splitlines() if re.match(r"^\s*\d+\s+\d+\s+\S+", ln)]
rv["members"] = len(lines) if lines else None
return rv
def api_cluster_data() -> Dict[str, Any]:
return {
"cluster_status": get_json(["pvesh", "get", "/cluster/status"]) or [],
"ha_status": get_json(["pvesh", "get", "/cluster/ha/status"]) or [],
"ha_resources": get_json(["pvesh", "get", "/cluster/ha/resources"]) or [],
"ha_groups": get_json(["pvesh", "get", "/cluster/ha/groups"]) or [],
"nodes": get_json(["pvesh", "get", "/nodes"]) or [],
}
def enrich_nodes(nodes_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for n in nodes_list or []:
name = n.get("node")
if not name:
out.append(n); continue
detail = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
if "loadavg" in detail: n["loadavg"] = detail["loadavg"]
if "cpu" in detail: n["cpu"] = detail["cpu"]
if "memory" in detail:
n["mem"] = detail["memory"].get("used"); n["maxmem"] = detail["memory"].get("total")
if "rootfs" in detail:
n["rootfs"] = detail["rootfs"].get("used"); n["maxrootfs"] = detail["rootfs"].get("total")
out.append(n)
return out
# ---------------- ha-manager parser ----------------
def parse_ha_manager(text: str) -> Dict[str, Any]:
nodes: Dict[str, Dict[str, str]] = {}
resources: Dict[str, Dict[str, str]] = {}
current_node: Optional[str] = None
for line in (text or "").splitlines():
s = line.strip()
m = re.match(r"node:\s+(\S+)\s+\(([^)]+)\)", s)
if m:
current_node = m.group(1)
nodes.setdefault(current_node, {"node": current_node, "state": m.group(2)})
continue
m = re.match(r"(lrm|crm)\s+status:\s+(\S+)", s)
if m and current_node:
nodes[current_node]["lrm" if m.group(1)=="lrm" else "crm"] = m.group(2)
continue
m = re.match(r"service:\s+(\S+)\s+on\s+(\S+)\s+\(([^)]+)\)", s)
if m:
sid, node, flags = m.group(1), m.group(2), m.group(3)
rec = {"sid": sid, "node": node, "flags": flags}
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
resources[sid] = rec
continue
m = re.match(r"service:\s+(\S+)\s+\(([^)]*)\)\s+on\s+(\S+)", s)
if m:
sid, flags, node = m.group(1), m.group(2), m.group(3)
rec = {"sid": sid, "node": node, "flags": flags}
rec["state"] = "started" if "started" in flags else ("stopped" if "stopped" in flags else rec.get("state"))
resources[sid] = rec
continue
m = re.match(r"service\s+(\S+):\s+(\S+)\s+on\s+(\S+)", s)
if m:
sid, st, node = m.group(1), m.group(2), m.group(3)
resources[sid] = {"sid": sid, "state": st, "node": node}
continue
return {"nodes": list(nodes.values()), "resources": list(resources.values())}
# ---------------- SID utils / indexes ----------------
def norm_sid(s: Optional[str]) -> Optional[str]:
if not s: return None
s = str(s)
m = re.match(r"^(vm|ct):(\d+)$", s)
if m: return f"{m.group(1)}:{m.group(2)}"
m = re.match(r"^(qemu|lxc)/(\d+)$", s)
if m: return ("vm" if m.group(1) == "qemu" else "ct") + f":{m.group(2)}"
return s
def cluster_vmct_index() -> Dict[str, str]:
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
idx: Dict[str, str] = {}
for it in items:
t = it.get("type")
if t not in ("qemu", "lxc"): continue
vmid = it.get("vmid"); node = it.get("node")
sid = (("vm" if t=="qemu" else "ct") + f":{vmid}") if vmid is not None else norm_sid(it.get("id"))
if sid and node: idx[sid] = node
return idx
def cluster_vmct_meta() -> Dict[str, Dict[str, Any]]:
items = get_json(["pvesh", "get", "/cluster/resources"]) or []
meta: Dict[str, Dict[str, Any]] = {}
for it in items:
t = it.get("type")
if t not in ("qemu", "lxc"): continue
sid = norm_sid(it.get("id")) or (("vm" if t == "qemu" else "ct") + f":{it.get('vmid')}")
if sid:
meta[sid] = {
"sid": sid, "type": t, "vmid": it.get("vmid"),
"node": it.get("node"), "name": it.get("name"),
"status": it.get("status"), "hastate": it.get("hastate")
}
return meta
def merge_resources(api_res: List[Dict[str, Any]],
parsed_res: List[Dict[str, Any]],
vmct_idx: Dict[str, str]) -> List[Dict[str, Any]]:
by_sid: Dict[str, Dict[str, Any]] = {}
for r in (api_res or []):
sid = norm_sid(r.get("sid"))
if not sid: continue
x = dict(r); x["sid"] = sid; by_sid[sid] = x
for r in (parsed_res or []):
sid = norm_sid(r.get("sid"))
if not sid: continue
x = by_sid.get(sid, {"sid": sid})
for k, v in r.items():
if k == "sid": continue
if v not in (None, ""): x[k] = v
by_sid[sid] = x
for sid, x in by_sid.items():
if not x.get("node") and sid in vmct_idx:
x["node"] = vmct_idx[sid]
return list(by_sid.values())
# ---------------- VM details ----------------
def sid_to_tuple(sid: str, meta: Dict[str, Dict[str, Any]]) -> Optional[Tuple[str, int, str]]:
sid_n = norm_sid(sid)
if not sid_n: return None
m = re.match(r"^(vm|ct):(\d+)$", sid_n)
if not m: return None
typ = "qemu" if m.group(1) == "vm" else "lxc"
vmid = int(m.group(2))
node = (meta.get(sid_n) or {}).get("node")
return (typ, vmid, node)
def vm_detail_payload(sid: str) -> Dict[str, Any]:
meta = cluster_vmct_meta()
tup = sid_to_tuple(sid, meta)
if not tup: return {"sid": sid, "error": "bad sid"}
typ, vmid, node = tup
if not node: return {"sid": sid, "error": "unknown node"}
base = f"/nodes/{node}/{typ}/{vmid}"
current = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
config = get_json(["pvesh", "get", f"{base}/config"]) or {}
agent_info = None; agent_os = None; agent_ifaces = None
if typ == "qemu":
agent_info = get_json(["pvesh", "get", f"{base}/agent/info"])
agent_os = post_json(["pvesh", "create", f"{base}/agent/get-osinfo"]) or None
agent_ifaces = post_json(["pvesh", "create", f"{base}/agent/network-get-interfaces"]) or None
return {
"sid": norm_sid(sid), "node": node, "type": typ, "vmid": vmid,
"meta": meta.get(norm_sid(sid), {}),
"current": current, "config": config,
"agent": {"info": agent_info, "osinfo": agent_os, "ifaces": agent_ifaces} if typ=="qemu" else None
}
# ---------------- Node details ----------------
def node_detail_payload(name: str) -> Dict[str, Any]:
if not name: return {"error": "no node"}
status = get_json(["pvesh", "get", f"/nodes/{name}/status"]) or {}
version = get_json(["pvesh", "get", f"/nodes/{name}/version"]) or {}
timeinfo = get_json(["pvesh", "get", f"/nodes/{name}/time"]) or {}
services = get_json(["pvesh", "get", f"/nodes/{name}/services"]) or []
network_cfg = get_json(["pvesh", "get", f"/nodes/{name}/network"]) or []
netstat = get_json(["pvesh", "get", f"/nodes/{name}/netstat"]) or []
disks = get_json(["pvesh", "get", f"/nodes/{name}/disks/list"]) or []
subscription = get_json(["pvesh", "get", f"/nodes/{name}/subscription"]) or {}
return {"node": name,"status": status,"version": version,"time": timeinfo,"services": services,
"network_cfg": network_cfg,"netstat": netstat,"disks": disks,"subscription": subscription}
def node_ha_services(node: str) -> Dict[str, str]:
svcs = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
def one(name: str) -> str:
for s in svcs:
if s.get("name") == name:
st = s.get("state") or s.get("active") or ""
return "active" if str(st).lower() in ("running", "active") else (st or "inactive")
return ""
return {"crm_state": one("pve-ha-crm"), "lrm_state": one("pve-ha-lrm")}
def units_for_node(node: str) -> Dict[str, str]:
wanted = {"watchdog-mux", "pve-ha-crm", "pve-ha-lrm"}
svc = get_json(["pvesh", "get", f"/nodes/{node}/services"]) or []
states: Dict[str, str] = {}
def norm_state(s: dict) -> str:
raw_active = str((s.get("active","") or s.get("active-state","") or s.get("ActiveState","") or s.get("activestate",""))).lower()
status = str(s.get("status","")).lower()
substate = str(s.get("substate","")).lower()
state = str(s.get("state","")).lower()
any_active = (raw_active in ("active","running","1","true") or status in ("active","running") or substate in ("running","active") or ("running" in state or "active" in state))
return "active" if any_active else "inactive"
for s in svc:
name_raw = (s.get("name") or ""); name = re.sub(r"\.service$","",name_raw)
if name in wanted: states[name] = norm_state(s)
for u in wanted:
if states.get(u) != "active" and is_active(u):
states[u] = "active"
for u in wanted: states.setdefault(u, "inactive")
return states
# ---------------- snapshot ----------------
def status_snapshot(node: str) -> Dict[str, Any]:
vq = votequorum_brief()
api = api_cluster_data()
api["nodes"] = enrich_nodes(api.get("nodes", []))
ha_raw = get_ha_status_raw().strip()
parsed = parse_ha_manager(ha_raw)
vmct_ix = cluster_vmct_index()
ha_status = api.get("ha_status") or []
if not ha_status and parsed.get("nodes"):
ha_status = [{
"node": n.get("node"), "state": n.get("state"),
"crm_state": n.get("crm",""), "lrm_state": n.get("lrm","")
} for n in parsed["nodes"]]
if not ha_status:
for it in api.get("cluster_status", []):
if it.get("type") == "node":
ha_status.append({"node": it.get("name"),"state": "online" if it.get("online") else "offline","crm_state":"", "lrm_state":""})
enriched = []
for n in ha_status:
node_name = n.get("node"); crm = n.get("crm_state") or ""; lrm = n.get("lrm_state") or ""
if node_name and (not crm or not lrm):
try:
svc = node_ha_services(node_name)
if not crm: n["crm_state"] = svc.get("crm_state","")
if not lrm: n["lrm_state"] = svc.get("lrm_state","")
except Exception:
pass
enriched.append(n)
api["ha_status"] = enriched
api["ha_resources"] = merge_resources(api.get("ha_resources", []), parsed.get("resources", []), vmct_ix)
units = units_for_node(node or socket.gethostname())
return {"node_arg": node, "hostname": socket.gethostname(), "votequorum": vq, "units": units,
"cfgtool": get_cfgtool().strip(), "pvecm": get_pvecm_status().strip(),
"ha_raw": ha_raw, "replication": get_pvesr_status().strip(), "api": api, "ts": int(time.time())}
# ---------------- web ----------------
@app.get("/")
def index():
node = request.args.get("node", DEFAULT_NODE)
return render_template("index.html", title=APP_TITLE, node=node)
# Stary zbiorczy snapshot — zostaje
@app.get("/api/info")
def api_info():
node = request.args.get("node", DEFAULT_NODE)
return jsonify(status_snapshot(node))
# --- NOWE lżejsze endpointy (szybsze ładowanie strony) ---
@app.get("/api/cluster")
def api_cluster_brief():
vq = votequorum_brief()
api = api_cluster_data()
ha_raw = get_ha_status_raw().strip()
parsed = parse_ha_manager(ha_raw)
vmct_ix = cluster_vmct_index()
api["ha_resources"] = merge_resources(api.get("ha_resources", []), parsed.get("resources", []), vmct_ix)
return jsonify({
"votequorum": vq,
"cluster_status": api.get("cluster_status", []),
"ha_resources": api.get("ha_resources", []),
"pvecm": get_pvecm_status().strip(),
"cfgtool": get_cfgtool().strip(),
"hostname": socket.gethostname(),
"ts": int(time.time())
})
@app.get("/api/nodes/summary")
def api_nodes_summary():
nodes = enrich_nodes((api_cluster_data().get("nodes") or []))
return jsonify({ "nodes": nodes })
@app.get("/api/units")
def api_units():
node = request.args.get("node", DEFAULT_NODE)
return jsonify({ "units": units_for_node(node) })
# Replication ze wszystkich nodów
@app.get("/api/replication/all")
def api_replication_all():
jobs: List[Dict[str, Any]] = []
nodes = [n.get("node") for n in (api_cluster_data().get("nodes") or []) if n.get("node")]
for name in nodes:
data = get_json(["pvesh", "get", f"/nodes/{name}/replication"]) or get_json(["pvesh","get",f"/nodes/{name}/replication/jobs"]) or []
for it in (data or []):
jobs.append({
"node": name,
"job": it.get("id") or it.get("job") or "",
"enabled": "yes" if (it.get("enable",1) in (1, "1", True)) else "no",
"target": it.get("target") or it.get("target-node") or "",
"last": it.get("last_sync") or it.get("last_sync", ""),
"next": it.get("next_sync") or it.get("next_sync", ""),
"dur": it.get("duration") or it.get("duration", ""),
"fail": int(it.get("fail_count") or it.get("failcount") or 0),
"state": it.get("state") or it.get("status") or ""
})
return jsonify({ "jobs": jobs })
# --- Detale ---
@app.get("/api/vm")
def api_vm_detail():
sid = request.args.get("sid", "")
return jsonify(vm_detail_payload(sid))
@app.get("/api/node")
def api_node_detail():
name = request.args.get("name", "")
return jsonify(node_detail_payload(name))
@app.get("/api/list-vmct")
def api_list_vmct():
meta = cluster_vmct_meta()
ha_sids = {norm_sid(r.get("sid")) for r in (api_cluster_data().get("ha_resources") or []) if r.get("sid")}
nonha = [v for k, v in meta.items() if k not in ha_sids]
return jsonify({"nonha": nonha, "ha_index": list(ha_sids), "count_nonha": len(nonha), "count_all_vmct": len(meta)})
# --- Enable/Disable maintenance ---
@app.post("/api/enable")
def api_enable():
if os.geteuid() != 0:
return jsonify(ok=False, error="run as root"), 403
data = request.get_json(force=True, silent=True) or {}
node = data.get("node") or DEFAULT_NODE
log: List[str] = []
ha_node_maint(True, node, log)
for u in HA_UNITS_STOP: stop_if_running(u, log)
return jsonify(ok=True, log=log)
@app.post("/api/disable")
def api_disable():
if os.geteuid() != 0:
return jsonify(ok=False, error="run as root"), 403
data = request.get_json(force=True, silent=True) or {}
node = data.get("node") or DEFAULT_NODE
log: List[str] = []
for u in HA_UNITS_START: start_if_needed(u, log)
ha_node_maint(False, node, log)
return jsonify(ok=True, log=log)
# --- VM/CT admin actions API (zwrot UPID dla live) ---
def vm_locked(typ: str, node: str, vmid: int) -> bool:
base = f"/nodes/{node}/{typ}/{vmid}"
cur = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
return bool(cur.get("lock"))
def run_qm_pct(typ: str, vmid: int, subcmd: str) -> subprocess.CompletedProcess:
tool = "qm" if typ == "qemu" else "pct"
return run([tool, subcmd, str(vmid)], timeout=120)
@app.get("/api/list-all-vmct")
def api_list_all_vmct():
meta = cluster_vmct_meta()
nodes = [n.get("node") for n in (api_cluster_data().get("nodes") or []) if n.get("node")]
return jsonify({"all": list(meta.values()), "nodes": sorted(set(nodes))})
@app.post("/api/vm-action")
def api_vm_action():
if os.geteuid() != 0:
return jsonify(ok=False, error="run as root"), 403
data = request.get_json(force=True, silent=True) or {}
sid = data.get("sid", "")
action = data.get("action", "")
target = data.get("target", "")
meta = cluster_vmct_meta()
tup = sid_to_tuple(sid, meta)
if not tup:
return jsonify(ok=False, error="bad sid"), 400
typ, vmid, node = tup
if not node:
return jsonify(ok=False, error="unknown node"), 400
try:
if action == "unlock":
# brak UPID (działa natychmiast)
if typ != "qemu":
return jsonify(ok=False, error="unlock only for qemu"), 400
if not vm_locked(typ, node, vmid):
return jsonify(ok=True, msg="not locked")
r = run(["qm", "unlock", str(vmid)])
return jsonify(ok=(r.returncode == 0), stdout=r.stdout, stderr=r.stderr, upid=None, source_node=node)
elif action in ("start", "stop", "shutdown"):
# PVE API -> zwraca UPID
base = f"/nodes/{node}/{typ}/{vmid}/status/{action}"
res = post_json(["pvesh", "create", base], timeout=120) or {}
upid = None
if isinstance(res, dict):
upid = res.get("data") or res.get("upid")
if not upid:
# fallback do qm/pct (bez UPID), ale zwrócimy ok
r = run_qm_pct(typ, vmid, action)
return jsonify(ok=(r.returncode == 0), stdout=r.stdout, stderr=r.stderr, upid=None, source_node=node)
return jsonify(ok=True, result=res, upid=upid, source_node=node)
elif action == "migrate" or action == "migrate_offline":
if action == "migrate_offline": # zgodność wstecz
action = "migrate"
if not target or target == node:
return jsonify(ok=False, error="target required and must differ from source"), 400
base = f"/nodes/{node}/{typ}/{vmid}/migrate"
res = post_json(["pvesh", "create", base, "-target", target, "-online", "0"], timeout=120) or {}
upid = None
if isinstance(res, dict):
upid = res.get("data") or res.get("upid")
return jsonify(ok=True, result=res, upid=upid, source_node=node)
else:
return jsonify(ok=False, error="unknown action"), 400
except Exception as e:
return jsonify(ok=False, error=str(e)), 500
@app.get("/api/task-status")
def api_task_status():
upid = request.args.get("upid", "").strip()
node = request.args.get("node", "").strip()
if not upid or not node:
return jsonify(ok=False, error="upid and node required"), 400
st = get_json(["pvesh", "get", f"/nodes/{node}/tasks/{upid}/status"]) or {}
return jsonify(ok=True, status=st)
@app.get("/api/task-log")
def api_task_log():
upid = request.args.get("upid", "").strip()
node = request.args.get("node", "").strip()
start = request.args.get("start", "0").strip()
try:
start_i = int(start)
except Exception:
start_i = 0
if not upid or not node:
return jsonify(ok=False, error="upid and node required"), 400
lines = get_json(["pvesh", "get", f"/nodes/{node}/tasks/{upid}/log", "-start", str(start_i)]) or []
next_start = start_i
if isinstance(lines, list) and lines:
try:
next_start = max((int(x.get("n", start_i)) for x in lines if isinstance(x, dict)), default=start_i) + 1
except Exception:
next_start = start_i
return jsonify(ok=True, lines=lines or [], next_start=next_start)
# ---------------- WebSocket: live tail zadań ----------------
@sock.route("/ws/task")
def ws_task(ws):
# query: upid, node
q = ws.environ.get("QUERY_STRING", "")
params = {}
for part in q.split("&"):
if not part: continue
k, _, v = part.partition("=")
params[k] = v
upid = params.get("upid", "").strip()
node = params.get("node", "").strip()
if not upid or not node:
ws.send(json.dumps({"type":"error","error":"upid and node are required"}))
return
start = 0
try:
while True:
st = get_json(["pvesh", "get", f"/nodes/{node}/tasks/{upid}/status"]) or {}
ws.send(json.dumps({"type":"status","status":st}))
lines = get_json(["pvesh", "get", f"/nodes/{node}/tasks/{upid}/log", "-start", str(start)]) or []
if isinstance(lines, list) and lines:
for ln in lines:
txt = (ln.get("t") if isinstance(ln, dict) else None)
if txt:
ws.send(json.dumps({"type":"log","line":txt}))
try:
start = max((int(x.get("n", start)) for x in lines if isinstance(x, dict)), default=start) + 1
except Exception:
pass
# koniec
if isinstance(st, dict) and (str(st.get("status","")).lower() == "stopped" or st.get("exitstatus")):
ok = (str(st.get("exitstatus","")).upper() == "OK")
ws.send(json.dumps({"type":"done","ok":ok,"exitstatus":st.get("exitstatus")}))
break
time.sleep(1.2)
except Exception:
try:
ws.close()
except Exception:
pass
# ---------------- WebSocket: live tail zadań ----------------
@sock.route("/ws/task")
def ws_task(ws):
# (bez zmian jak w mojej poprzedniej wersji)
q = ws.environ.get("QUERY_STRING", "")
params = {}
for part in q.split("&"):
if not part: continue
k, _, v = part.partition("=")
params[k] = v
upid = params.get("upid", "").strip()
node = params.get("node", "").strip()
if not upid or not node:
ws.send(json.dumps({"type":"error","error":"upid and node are required"}))
return
start = 0
try:
while True:
st = get_json(["pvesh", "get", f"/nodes/{node}/tasks/{upid}/status"]) or {}
ws.send(json.dumps({"type":"status","status":st}))
lines = get_json(["pvesh", "get", f"/nodes/{node}/tasks/{upid}/log", "-start", str(start)]) or []
if isinstance(lines, list) and lines:
for ln in lines:
txt = (ln.get("t") if isinstance(ln, dict) else None)
if txt:
ws.send(json.dumps({"type":"log","line":txt}))
try:
start = max((int(x.get("n", start)) for x in lines if isinstance(x, dict)), default=start) + 1
except Exception:
pass
if isinstance(st, dict) and (str(st.get("status","")).lower() == "stopped" or st.get("exitstatus")):
ok = (str(st.get("exitstatus","")).upper() == "OK")
ws.send(json.dumps({"type":"done","ok":ok,"exitstatus":st.get("exitstatus")}))
break
time.sleep(1.2)
except Exception:
try: ws.close()
except Exception: pass
# ---------------- WebSocket: broadcast observe per sid ----------------
@sock.route("/ws/observe")
def ws_observe(ws):
"""
Streamuje zmiany stanu VM/CT i aktywne taski dla danego SID.
Query: sid=vm:101 (albo ct:123)
"""
q = ws.environ.get("QUERY_STRING", "")
params = {}
for part in q.split("&"):
if not part: continue
k, _, v = part.partition("="); params[k] = v
sid = (params.get("sid") or "").strip()
if not sid:
ws.send(json.dumps({"type":"error","error":"sid required"})); return
meta = cluster_vmct_meta()
tup = sid_to_tuple(sid, meta)
if not tup:
ws.send(json.dumps({"type":"error","error":"unknown sid"})); return
typ, vmid, node = tup
if not node:
ws.send(json.dumps({"type":"error","error":"could not resolve node"})); return
last_hash = None
seen_upids = set()
try:
while True:
base = f"/nodes/{node}/{typ}/{vmid}"
cur = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
# wyślij tylko gdy zmiana
cur_hash = json.dumps(cur, sort_keys=True)
if cur_hash != last_hash:
last_hash = cur_hash
ws.send(json.dumps({"type":"vm","current":cur,"meta":{"sid":norm_sid(sid),"node":node,"typ":typ,"vmid":vmid}}))
tasks = get_json(["pvesh","get",f"/nodes/{node}/tasks","-limit","50"]) or []
for t in tasks:
upid = t.get("upid") or t.get("pstart") # upid musi być stringiem
tid = (t.get("id") or "") # np. "qemu/101" lub "lxc/123"
if not isinstance(upid, str): continue
if (str(vmid) in tid) or (f"{'qemu' if typ=='qemu' else 'lxc'}/{vmid}" in tid):
# status szczegółowy
st = get_json(["pvesh","get",f"/nodes/{node}/tasks/{upid}/status"]) or {}
ev = {"type":"task","upid":upid,"status":st.get("status"),"exitstatus":st.get("exitstatus"),"node":node}
ws.send(json.dumps(ev))
# nowy "running" task → ogłoś raz
if upid not in seen_upids and (str(st.get("status","")).lower() != "stopped"):
seen_upids.add(upid)
ws.send(json.dumps({"type":"task-start","upid":upid,"node":node}))
# zakończenie
if str(st.get("status","")).lower() == "stopped" or st.get("exitstatus"):
ws.send(json.dumps({"type":"done","upid":upid,"ok":str(st.get('exitstatus','')).upper()=='OK'}))
time.sleep(2.0)
except Exception:
try: ws.close()
except Exception: pass
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
p.add_argument("--bind", default="127.0.0.1:8007", help="addr:port")
p.add_argument("--node", default=DEFAULT_NODE, help="default node")
args = p.parse_args()
DEFAULT_NODE = args.node
host, port = args.bind.split(":")
app.run(host=host, port=int(port), debug=False, threaded=True)