216 lines
7.7 KiB
Python
216 lines
7.7 KiB
Python
import sys
|
|
import os
|
|
import configparser
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
import subprocess
|
|
from glob import glob
|
|
from datetime import datetime
|
|
import re # Dodane dla walidacji regexów
|
|
|
|
REQUIRED_SERVER_KEYS = {"url", "username", "password"}
|
|
REQUIRED_CLIENT_KEYS = {"ip", "services", "schedule"}
|
|
|
|
def should_block(schedule):
|
|
if schedule == "whole":
|
|
return True
|
|
elif schedule.startswith("custom:"):
|
|
try:
|
|
hours = schedule.split(":")[1]
|
|
start, end = map(int, hours.split("-"))
|
|
now = datetime.now().hour
|
|
if start <= end:
|
|
return start <= now < end
|
|
else:
|
|
return now >= start or now < end
|
|
except:
|
|
return False
|
|
return False
|
|
|
|
def show_block_details(ip, service, schedule):
|
|
print(f"Blokada: IP={ip} | Serwis={service} | Harmonogram={schedule}")
|
|
|
|
def install_cron():
|
|
python_exec = sys.executable
|
|
script_path = os.path.abspath(__file__)
|
|
config_path = os.path.join(os.path.dirname(script_path), "config.ini")
|
|
cron_line = f"0 * * * * {python_exec} {script_path} {config_path} --cron > /dev/null 2>&1"
|
|
|
|
result = subprocess.run(["crontab", "-l"], capture_output=True, text=True)
|
|
existing_cron = result.stdout if result.returncode == 0 else ""
|
|
|
|
if cron_line in existing_cron:
|
|
print("Cron już istnieje.")
|
|
return
|
|
|
|
updated_cron = existing_cron.strip() + f"\n{cron_line}\n"
|
|
subprocess.run(["crontab", "-"], input=updated_cron, text=True)
|
|
print("Dodano do crona.")
|
|
|
|
def print_help():
|
|
print("Użycie:")
|
|
print(" python adguard_scheduler.py [config.ini] [--config-dir DIR] [--install-cron|--cron|--help|-h]")
|
|
print("\nOpcje:")
|
|
print(" --install-cron Dodaje zadanie do crona co godzinę")
|
|
print(" --cron Uruchomienie bez outputu (do crona)")
|
|
print(" --config-dir DIR Wczytaj wszystkie pliki .ini z katalogu DIR")
|
|
print(" --help, -h Pokazuje pomoc")
|
|
print("\nPrzykłady:")
|
|
print(" python adguard_scheduler.py")
|
|
print(" python adguard_scheduler.py /ścieżka/config.ini")
|
|
print(" python adguard_scheduler.py --config-dir /etc/adguard_configs")
|
|
print(" python adguard_scheduler.py --install-cron")
|
|
print(" python adguard_scheduler.py /cfg.ini --cron")
|
|
|
|
def validate_config(config):
|
|
for section in config.sections():
|
|
keys = set(config[section].keys())
|
|
if section.startswith("server") and not REQUIRED_SERVER_KEYS.issubset(keys):
|
|
raise ValueError(f"Błędna konfiguracja w {section}: wymagane {REQUIRED_SERVER_KEYS}")
|
|
if section.startswith("client:") and not REQUIRED_CLIENT_KEYS.issubset(keys):
|
|
raise ValueError(f"Błędna konfiguracja w {section}: wymagane {REQUIRED_CLIENT_KEYS}")
|
|
|
|
def is_valid_regex(pattern):
|
|
"""Walidacja regexów (opcjonalne)"""
|
|
try:
|
|
re.compile(pattern)
|
|
return True
|
|
except re.error:
|
|
return False
|
|
|
|
def expand_aliases(config):
|
|
aliases = {}
|
|
for section in config.sections():
|
|
if section.startswith("alias:"):
|
|
alias_name = section.split(":")[1]
|
|
domains = []
|
|
for d in config.get(section, "domains").split(","):
|
|
d = d.strip()
|
|
if d:
|
|
# Opcjonalna walidacja regexów (tylko dla /.../)
|
|
if d.startswith("/") and d.endswith("/"):
|
|
if not is_valid_regex(d[1:-1]):
|
|
print(f"⚠️ Nieprawidłowy regex w aliasie {alias_name}: {d}")
|
|
continue
|
|
domains.append(d)
|
|
aliases[alias_name] = domains
|
|
return aliases
|
|
|
|
def load_configs(files):
|
|
merged = configparser.ConfigParser()
|
|
for file in files:
|
|
if os.path.isfile(file):
|
|
conf = configparser.ConfigParser()
|
|
conf.read(file)
|
|
for section in conf.sections():
|
|
merged[section] = conf[section]
|
|
validate_config(merged)
|
|
return merged
|
|
|
|
def test_server_connection(url, auth):
|
|
try:
|
|
r = requests.get(f"{url}/control/status", auth=auth, timeout=5)
|
|
return r.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
def main(config_paths, silent=False):
|
|
config = load_configs(config_paths)
|
|
aliases = expand_aliases(config)
|
|
|
|
servers = {
|
|
section: {
|
|
"url": config.get(section, "url").rstrip("/"),
|
|
"username": config.get(section, "username"),
|
|
"password": config.get(section, "password"),
|
|
}
|
|
for section in config.sections() if section.startswith("server")
|
|
}
|
|
|
|
clients = {}
|
|
for section in config.sections():
|
|
if section.startswith("client:"):
|
|
raw_services = [s.strip() for s in config.get(section, "services").split(",")]
|
|
expanded_services = []
|
|
for service in raw_services:
|
|
if service in aliases:
|
|
expanded_services.extend(aliases[service])
|
|
else:
|
|
expanded_services.append(service)
|
|
|
|
clients[section] = {
|
|
"ip": config.get(section, "ip"),
|
|
"services": expanded_services,
|
|
"schedule": config.get(section, "schedule"),
|
|
}
|
|
|
|
for srv_name, srv in servers.items():
|
|
if not silent:
|
|
print(f"--- Przetwarzanie serwera {srv_name} ---")
|
|
|
|
auth = HTTPBasicAuth(srv["username"], srv["password"])
|
|
if not test_server_connection(srv["url"], auth):
|
|
if not silent:
|
|
print(f"[{srv['url']}] Niedostępny / brak połączenia.")
|
|
continue
|
|
|
|
try:
|
|
rules_set = set()
|
|
for cdata in clients.values():
|
|
if should_block(cdata["schedule"]):
|
|
for service in cdata["services"]:
|
|
# Zachowaj oryginalną składnię (regex, wyjątki, itp.)
|
|
if service.startswith(("||", "@@", "/", "!", "#", "127.0.0.1")):
|
|
rule = f"{service}$client={cdata['ip']}"
|
|
else:
|
|
rule = f"||{service}^$client={cdata['ip']}" # Domyślna reguła
|
|
rules_set.add(rule)
|
|
if not silent:
|
|
show_block_details(cdata["ip"], service, cdata["schedule"])
|
|
|
|
payload = {"rules": sorted(rules_set)}
|
|
endpoint = f"{srv['url']}/control/filtering/set_rules"
|
|
response = requests.post(endpoint, json=payload, auth=auth,
|
|
headers={"Content-Type": "application/json"})
|
|
|
|
if not silent:
|
|
if response.status_code in [200, 204]:
|
|
print(f"[{srv['url']}] Reguły zaktualizowane.")
|
|
else:
|
|
print(f"[{srv['url']}] Błąd HTTP: {response.status_code} {response.text}")
|
|
|
|
except Exception as e:
|
|
if not silent:
|
|
print(f"[{srv['url']}] Błąd: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
args = sys.argv[1:]
|
|
|
|
if "--help" in args or "-h" in args:
|
|
print_help()
|
|
sys.exit(0)
|
|
|
|
if "--install-cron" in args:
|
|
install_cron()
|
|
sys.exit(0)
|
|
|
|
silent = "--cron" in args
|
|
args = [a for a in args if not a.startswith("--")]
|
|
|
|
config_files = []
|
|
|
|
if "--config-dir" in sys.argv:
|
|
try:
|
|
dir_index = sys.argv.index("--config-dir") + 1
|
|
config_dir = sys.argv[dir_index]
|
|
config_files = glob(os.path.join(config_dir, "*.ini"))
|
|
except IndexError:
|
|
print("Brak ścieżki po --config-dir")
|
|
sys.exit(1)
|
|
elif args:
|
|
config_files = [args[0]]
|
|
else:
|
|
config_files = [os.path.join(os.path.dirname(__file__), "config.ini")]
|
|
|
|
main(config_files, silent=silent)
|