Files
adguard_blocker/adguard_scheduler.py
2025-06-11 14:01:26 +02:00

176 lines
6.1 KiB
Python

import sys
import os
import configparser
import requests
from requests.auth import HTTPBasicAuth
import subprocess
from glob import glob
from datetime import datetime
REQUIRED_SERVER_KEYS = {"url", "username", "password"}
REQUIRED_CLIENT_KEYS = {"ip", "services", "schedule"}
def should_block(schedule):
if schedule == "whole":
return True
elif schedule.startswith("custom:"):
try:
hours = schedule.split(":")[1]
start, end = map(int, hours.split("-"))
now = datetime.now().hour
if start <= end:
return start <= now < end
else:
return now >= start or now < end
except:
return False
return False
def show_block_details(ip, service, schedule):
print(f"Blokada: IP={ip} | Serwis={service} | Harmonogram={schedule}")
def install_cron():
python_exec = sys.executable
script_path = os.path.abspath(__file__)
config_path = os.path.join(os.path.dirname(script_path), "config.ini")
cron_line = f"0 * * * * {python_exec} {script_path} {config_path} --cron > /dev/null 2>&1"
result = subprocess.run(["crontab", "-l"], capture_output=True, text=True)
existing_cron = result.stdout if result.returncode == 0 else ""
if cron_line in existing_cron:
print("Cron już istnieje.")
return
updated_cron = existing_cron.strip() + f"\n{cron_line}\n"
subprocess.run(["crontab", "-"], input=updated_cron, text=True)
print("Dodano do crona.")
def print_help():
print("Użycie:")
print(" python adguard_scheduler.py [config.ini] [--config-dir DIR] [--install-cron|--cron|--help|-h]")
print("\nOpcje:")
print(" --install-cron Dodaje zadanie do crona co godzinę")
print(" --cron Uruchomienie bez outputu (do crona)")
print(" --config-dir DIR Wczytaj wszystkie pliki .ini z katalogu DIR")
print(" --help, -h Pokazuje pomoc")
print("\nPrzykłady:")
print(" python adguard_scheduler.py")
print(" python adguard_scheduler.py /ścieżka/config.ini")
print(" python adguard_scheduler.py --config-dir /etc/adguard_configs")
print(" python adguard_scheduler.py --install-cron")
print(" python adguard_scheduler.py /cfg.ini --cron")
def validate_config(config):
for section in config.sections():
keys = set(config[section].keys())
if section.startswith("server") and not REQUIRED_SERVER_KEYS.issubset(keys):
raise ValueError(f"Błędna konfiguracja w {section}: wymagane {REQUIRED_SERVER_KEYS}")
if section.startswith("client:") and not REQUIRED_CLIENT_KEYS.issubset(keys):
raise ValueError(f"Błędna konfiguracja w {section}: wymagane {REQUIRED_CLIENT_KEYS}")
def load_configs(files):
merged = configparser.ConfigParser()
for file in files:
if os.path.isfile(file):
conf = configparser.ConfigParser()
conf.read(file)
for section in conf.sections():
merged[section] = conf[section]
validate_config(merged)
return merged
def test_server_connection(url, auth):
try:
r = requests.get(f"{url}/control/status", auth=auth, timeout=5)
return r.status_code == 200
except:
return False
def main(config_paths, silent=False):
config = load_configs(config_paths)
servers = {
section: {
"url": config.get(section, "url").rstrip("/"),
"username": config.get(section, "username"),
"password": config.get(section, "password"),
}
for section in config.sections() if section.startswith("server")
}
clients = {
section: {
"ip": config.get(section, "ip"),
"services": [s.strip() for s in config.get(section, "services").split(",")],
"schedule": config.get(section, "schedule"),
}
for section in config.sections() if section.startswith("client:")
}
for srv_name, srv in servers.items():
if not silent:
print(f"--- Przetwarzanie serwera {srv_name} ---")
auth = HTTPBasicAuth(srv["username"], srv["password"])
if not test_server_connection(srv["url"], auth):
if not silent:
print(f"[{srv['url']}] Niedostępny / brak połączenia.")
continue
try:
rules_set = set()
for cdata in clients.values():
if should_block(cdata["schedule"]):
for service in cdata["services"]:
rule = f"||{service}^$client={cdata['ip']}"
rules_set.add(rule)
if not silent:
show_block_details(cdata["ip"], service, cdata["schedule"])
payload = {"rules": sorted(rules_set)}
endpoint = f"{srv['url']}/control/filtering/set_rules"
response = requests.post(endpoint, json=payload, auth=auth,
headers={"Content-Type": "application/json"})
if not silent:
if response.status_code in [200, 204]:
print(f"[{srv['url']}] Reguły zaktualizowane.")
else:
print(f"[{srv['url']}] Błąd HTTP: {response.status_code} {response.text}")
except Exception as e:
if not silent:
print(f"[{srv['url']}] Błąd: {e}")
if __name__ == "__main__":
args = sys.argv[1:]
if "--help" in args or "-h" in args:
print_help()
sys.exit(0)
if "--install-cron" in args:
install_cron()
sys.exit(0)
silent = "--cron" in args
args = [a for a in args if not a.startswith("--")]
config_files = []
if "--config-dir" in sys.argv:
try:
dir_index = sys.argv.index("--config-dir") + 1
config_dir = sys.argv[dir_index]
config_files = glob(os.path.join(config_dir, "*.ini"))
except IndexError:
print("Brak ścieżki po --config-dir")
sys.exit(1)
elif args:
config_files = [args[0]]
else:
config_files = [os.path.join(os.path.dirname(__file__), "config.ini")]
main(config_files, silent=silent)