refator_comm1

This commit is contained in:
Mateusz Gruszczyński
2025-10-18 21:28:42 +02:00
parent 3a90a48109
commit e3b3ff235b
2 changed files with 68 additions and 45 deletions

64
app.py
View File

@@ -586,21 +586,22 @@ def ws_task(ws):
# ---------------- WebSocket: broadcast observe per sid ----------------
@sock.route("/ws/observe")
def ws_observe(ws):
"""
Streamuje zmiany stanu VM/CT i aktywne taski dla danego SID.
Query: sid=vm:101 (albo ct:123)
"""
q = ws.environ.get("QUERY_STRING", "")
params = {}
for part in q.split("&"):
if not part: continue
k, _, v = part.partition("="); params[k] = v
sid = (params.get("sid") or "").strip()
sid_raw = (params.get("sid") or "").strip()
sid = norm_sid(sid_raw)
if not sid:
ws.send(json.dumps({"type":"error","error":"sid required"})); return
meta = cluster_vmct_meta()
tup = sid_to_tuple(sid, meta)
# Resolve tuple + node
def resolve_tuple() -> Optional[Tuple[str,int,str]]:
meta = cluster_vmct_meta()
return sid_to_tuple(sid, meta)
tup = resolve_tuple()
if not tup:
ws.send(json.dumps({"type":"error","error":"unknown sid"})); return
typ, vmid, node = tup
@@ -609,35 +610,44 @@ def ws_observe(ws):
last_hash = None
seen_upids = set()
prev_node = node
try:
while True:
ntup = resolve_tuple()
if ntup:
_, _, cur_node = ntup
if cur_node and cur_node != node:
ws.send(json.dumps({"type":"moved","old_node":node,"new_node":cur_node,"meta":{"sid":sid,"vmid":vmid,"typ":typ}}))
prev_node, node = node, cur_node
base = f"/nodes/{node}/{typ}/{vmid}"
cur = get_json(["pvesh", "get", f"{base}/status/current"]) or {}
# wyślij tylko gdy zmiana
cur_hash = json.dumps(cur, sort_keys=True)
if cur_hash != last_hash:
last_hash = cur_hash
ws.send(json.dumps({"type":"vm","current":cur,"meta":{"sid":norm_sid(sid),"node":node,"typ":typ,"vmid":vmid}}))
ws.send(json.dumps({"type":"vm","current":cur,"meta":{"sid":sid,"node":node,"typ":typ,"vmid":vmid}}))
tasks = get_json(["pvesh","get",f"/nodes/{node}/tasks","-limit","50"]) or []
for t in tasks:
upid = t.get("upid") or t.get("pstart") # upid musi być stringiem
tid = (t.get("id") or "") # np. "qemu/101" lub "lxc/123"
if not isinstance(upid, str): continue
if (str(vmid) in tid) or (f"{'qemu' if typ=='qemu' else 'lxc'}/{vmid}" in tid):
# status szczegółowy
st = get_json(["pvesh","get",f"/nodes/{node}/tasks/{upid}/status"]) or {}
ev = {"type":"task","upid":upid,"status":st.get("status"),"exitstatus":st.get("exitstatus"),"node":node}
ws.send(json.dumps(ev))
# nowy "running" task → ogłoś raz
if upid not in seen_upids and (str(st.get("status","")).lower() != "stopped"):
seen_upids.add(upid)
ws.send(json.dumps({"type":"task-start","upid":upid,"node":node}))
# zakończenie
if str(st.get("status","")).lower() == "stopped" or st.get("exitstatus"):
ws.send(json.dumps({"type":"done","upid":upid,"ok":str(st.get('exitstatus','')).upper()=='OK'}))
nodes_to_scan = [node] + ([prev_node] if prev_node and prev_node != node else [])
for nX in nodes_to_scan:
tasks = get_json(["pvesh","get",f"/nodes/{nX}/tasks","-limit","50"]) or []
for t in tasks:
upid = t.get("upid") if isinstance(t, dict) else None
tid = (t.get("id") or "") if isinstance(t, dict) else ""
if not upid or not isinstance(upid, str): continue
# dopasuj po vmid lub ciągu qemu/<vmid>|lxc/<vmid>
if (str(vmid) in tid) or (f"{'qemu' if typ=='qemu' else 'lxc'}/{vmid}" in tid):
st = get_json(["pvesh","get",f"/nodes/{nX}/tasks/{upid}/status"]) or {}
ws.send(json.dumps({"type":"task","upid":upid,"status":st.get("status"),"exitstatus":st.get("exitstatus"),"node":nX}))
# nowy running
if upid not in seen_upids and str(st.get("status","")).lower() != "stopped":
seen_upids.add(upid)
ws.send(json.dumps({"type":"task-start","upid":upid,"node":nX}))
# zakończone
if str(st.get("status","")).lower() == "stopped" or st.get("exitstatus"):
ws.send(json.dumps({"type":"done","upid":upid,"ok":str(st.get('exitstatus','')).upper()=='OK',"node":nX}))
time.sleep(2.0)
time.sleep(1.8)
except Exception:
try: ws.close()
except Exception: pass