From 368e62ad97005affc8c446f290e0f8e109d3ee73 Mon Sep 17 00:00:00 2001 From: gru Date: Sun, 15 Jun 2025 20:01:50 +0200 Subject: [PATCH] Add adguard_log_parser.py --- adguard_log_parser.py | 161 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 adguard_log_parser.py diff --git a/adguard_log_parser.py b/adguard_log_parser.py new file mode 100644 index 0000000..d06a54a --- /dev/null +++ b/adguard_log_parser.py @@ -0,0 +1,161 @@ +import json +import sys +import gzip +import glob +import tldextract +from collections import Counter + +def matches(record, ip, keywords): + if record.get("IP") != ip: + return False + if not keywords: + return True + domain = record.get("QH", "").lower() + return any(keyword.lower() in domain for keyword in keywords) + +def process_file(filepath, ip, keywords, domain_counter=None, raw_mode=False): + matched_count = 0 + open_func = gzip.open if filepath.endswith(".gz") else open + + try: + with open_func(filepath, "rt", encoding="utf-8", errors="ignore") as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + if matches(record, ip, keywords): + matched_count += 1 + if domain_counter is not None: + full_host = record.get("QH", "").lower() + if full_host: + if raw_mode: + domain = full_host + else: + extracted = tldextract.extract(full_host) + domain = f"{extracted.domain}.{extracted.suffix}" + if domain: + domain_counter[domain] += 1 + except json.JSONDecodeError: + print(f"Warning: invalid JSON on line {line_num} in {filepath}") + continue + except FileNotFoundError: + print(f"Error: File not found: {filepath}") + except Exception as e: + print(f"Error while processing file {filepath}: {e}") + + print(f"Matched {matched_count} entries in {filepath}.\n") + return matched_count + +def print_top_domains(domain_counter, limit): + print(f"\nTop {limit} visited domains:") + print(f"{'#':<4} {'Count':<8} Domain") + print("-" * 40) + for i, (domain, count) in enumerate(domain_counter.most_common(limit), 1): + print(f"{i:<4} {count:<8} {domain}") + +def print_help(): + print(""" +AdGuard Home Log Analyzer +========================= + +Usage: + python3 find_adguard_log.py [keyword1 keyword2 ...] [--top N] [--raw] + +Positional arguments: + One or more file patterns (e.g., querylog*.json or logs/*.gz) + IP address to filter (e.g., 192.168.1.10) + [keywords...] Optional keywords to match in domain queries (e.g., facebook netflix) + +Options: + --top N Show top N most visited domains (default: off) + --raw Do NOT consolidate subdomains (e.g., keep 'api.apple.com' separate from 'itunes.apple.com') + --help, -h Show this help message + +Features: + - Supports plain .json and .json.gz log files + - Accepts wildcards (e.g., 'querylog*.json*') + - Prints only final summary (not every matching line) + - Automatically consolidates domains unless --raw is used + +Examples: + python3 find_adguard_log.py logs/querylog*.json.gz 192.168.1.5 + python3 find_adguard_log.py logs/querylog*.json* 172.16.0.25 facebook google + python3 find_adguard_log.py querylog*.json.gz 192.168.1.8 --top 100 + python3 find_adguard_log.py *.json.gz 10.0.0.12 youtube --top 50 --raw +""") + +def parse_arguments(argv): + top_limit = None + raw_mode = False + show_help = False + args = [] + i = 0 + while i < len(argv): + if argv[i] in ('--help', '-h'): + show_help = True + i += 1 + elif argv[i] == '--top': + if i + 1 < len(argv) and argv[i + 1].isdigit(): + top_limit = int(argv[i + 1]) + i += 2 + else: + print("Error: --top must be followed by a number") + sys.exit(1) + elif argv[i] == '--raw': + raw_mode = True + i += 1 + else: + args.append(argv[i]) + i += 1 + return args, top_limit, raw_mode, show_help + +def main(): + raw_args = sys.argv[1:] + args, top_limit, raw_mode, show_help = parse_arguments(raw_args) + + if show_help or len(args) < 2: + print_help() + sys.exit(0 if show_help else 1) + + ip = None + keywords = [] + + # Find IP in args (simple check for IPv4 format) + for i, arg in enumerate(args): + parts = arg.split('.') + if len(parts) == 4 and all(p.isdigit() for p in parts): + ip = arg + file_patterns = args[:i] + keywords = args[i+1:] + break + + if not ip or not file_patterns: + print("Error: Please provide at least one log file pattern and an IP address.") + sys.exit(1) + + log_files = [] + for pattern in file_patterns: + matched = glob.glob(pattern) + if not matched: + print(f"Warning: No files matched pattern: {pattern}") + log_files.extend(matched) + + if not log_files: + print("Error: No matching log files found.") + sys.exit(1) + + total_matches = 0 + domain_counter = Counter() if top_limit else None + + for log_file in sorted(log_files): + total_matches += process_file(log_file, ip, keywords, domain_counter, raw_mode) + + print(f"\nTotal matched entries across all files: {total_matches}") + + if top_limit and total_matches > 0: + print_top_domains(domain_counter, top_limit) + +if __name__ == "__main__": + main()