import sys import os import configparser import requests from requests.auth import HTTPBasicAuth import subprocess from glob import glob from datetime import datetime REQUIRED_SERVER_KEYS = {"url", "username", "password"} REQUIRED_CLIENT_KEYS = {"ip", "services", "schedule"} def should_block(schedule): if schedule == "whole": return True elif schedule.startswith("custom:"): try: hours = schedule.split(":")[1] start, end = map(int, hours.split("-")) now = datetime.now().hour if start <= end: return start <= now < end else: return now >= start or now < end except: return False return False def show_block_details(ip, service, schedule): print(f"Blokada: IP={ip} | Serwis={service} | Harmonogram={schedule}") def install_cron(): python_exec = sys.executable script_path = os.path.abspath(__file__) config_path = os.path.join(os.path.dirname(script_path), "config.ini") cron_line = f"0 * * * * {python_exec} {script_path} {config_path} --cron > /dev/null 2>&1" result = subprocess.run(["crontab", "-l"], capture_output=True, text=True) existing_cron = result.stdout if result.returncode == 0 else "" if cron_line in existing_cron: print("Cron już istnieje.") return updated_cron = existing_cron.strip() + f"\n{cron_line}\n" subprocess.run(["crontab", "-"], input=updated_cron, text=True) print("Dodano do crona.") def print_help(): print("Użycie:") print(" python adguard_scheduler.py [config.ini] [--config-dir DIR] [--install-cron|--cron|--help|-h]") print("\nOpcje:") print(" --install-cron Dodaje zadanie do crona co godzinę") print(" --cron Uruchomienie bez outputu (do crona)") print(" --config-dir DIR Wczytaj wszystkie pliki .ini z katalogu DIR") print(" --help, -h Pokazuje pomoc") print("\nPrzykłady:") print(" python adguard_scheduler.py") print(" python adguard_scheduler.py /ścieżka/config.ini") print(" python adguard_scheduler.py --config-dir /etc/adguard_configs") print(" python adguard_scheduler.py --install-cron") print(" python adguard_scheduler.py /cfg.ini --cron") def validate_config(config): for section in config.sections(): keys = set(config[section].keys()) if section.startswith("server") and not REQUIRED_SERVER_KEYS.issubset(keys): raise ValueError(f"Błędna konfiguracja w {section}: wymagane {REQUIRED_SERVER_KEYS}") if section.startswith("client:") and not REQUIRED_CLIENT_KEYS.issubset(keys): raise ValueError(f"Błędna konfiguracja w {section}: wymagane {REQUIRED_CLIENT_KEYS}") def load_configs(files): merged = configparser.ConfigParser() for file in files: if os.path.isfile(file): conf = configparser.ConfigParser() conf.read(file) for section in conf.sections(): merged[section] = conf[section] validate_config(merged) return merged def test_server_connection(url, auth): try: r = requests.get(f"{url}/control/status", auth=auth, timeout=5) return r.status_code == 200 except: return False def main(config_paths, silent=False): config = load_configs(config_paths) servers = { section: { "url": config.get(section, "url").rstrip("/"), "username": config.get(section, "username"), "password": config.get(section, "password"), } for section in config.sections() if section.startswith("server") } clients = { section: { "ip": config.get(section, "ip"), "services": [s.strip() for s in config.get(section, "services").split(",")], "schedule": config.get(section, "schedule"), } for section in config.sections() if section.startswith("client:") } for srv_name, srv in servers.items(): if not silent: print(f"--- Przetwarzanie serwera {srv_name} ---") auth = HTTPBasicAuth(srv["username"], srv["password"]) if not test_server_connection(srv["url"], auth): if not silent: print(f"[{srv['url']}] Niedostępny / brak połączenia.") continue try: rules_set = set() for cdata in clients.values(): if should_block(cdata["schedule"]): for service in cdata["services"]: rule = f"||{service}^$client={cdata['ip']}" rules_set.add(rule) if not silent: show_block_details(cdata["ip"], service, cdata["schedule"]) payload = {"rules": sorted(rules_set)} endpoint = f"{srv['url']}/control/filtering/set_rules" response = requests.post(endpoint, json=payload, auth=auth, headers={"Content-Type": "application/json"}) if not silent: if response.status_code in [200, 204]: print(f"[{srv['url']}] Reguły zaktualizowane.") else: print(f"[{srv['url']}] Błąd HTTP: {response.status_code} {response.text}") except Exception as e: if not silent: print(f"[{srv['url']}] Błąd: {e}") if __name__ == "__main__": args = sys.argv[1:] if "--help" in args or "-h" in args: print_help() sys.exit(0) if "--install-cron" in args: install_cron() sys.exit(0) silent = "--cron" in args args = [a for a in args if not a.startswith("--")] config_files = [] if "--config-dir" in sys.argv: try: dir_index = sys.argv.index("--config-dir") + 1 config_dir = sys.argv[dir_index] config_files = glob(os.path.join(config_dir, "*.ini")) except IndexError: print("Brak ścieżki po --config-dir") sys.exit(1) elif args: config_files = [args[0]] else: config_files = [os.path.join(os.path.dirname(__file__), "config.ini")] main(config_files, silent=silent)