import json import sys import gzip import glob import tldextract from collections import Counter def matches(record, ip, keywords, unblocked_only): if record.get("IP") != ip: return False if not keywords: domain_match = True else: domain = record.get("QH", "").lower() domain_match = any(keyword.lower() in domain for keyword in keywords) if not domain_match: return False if unblocked_only: # Consider "Result" empty or missing as unblocked result = record.get("Result") # If Result is None or empty dict, treat as unblocked if result and isinstance(result, dict) and len(result) > 0: # Could be blocked return False return True def process_file(filepath, ip, keywords, domain_counter=None, raw_mode=False, unblocked_only=False): matched_count = 0 open_func = gzip.open if filepath.endswith(".gz") else open try: with open_func(filepath, "rt", encoding="utf-8", errors="ignore") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: record = json.loads(line) if matches(record, ip, keywords, unblocked_only): matched_count += 1 if domain_counter is not None: full_host = record.get("QH", "").lower() if full_host: if raw_mode: domain = full_host else: extracted = tldextract.extract(full_host) domain = f"{extracted.domain}.{extracted.suffix}" if domain: domain_counter[domain] += 1 except json.JSONDecodeError: print(f"Warning: invalid JSON on line {line_num} in {filepath}") continue except FileNotFoundError: print(f"Error: File not found: {filepath}") except Exception as e: print(f"Error while processing file {filepath}: {e}") print(f"Matched {matched_count} entries in {filepath}.\n") return matched_count def print_top_domains(domain_counter, limit): print(f"\nTop {limit} visited domains:") print(f"{'#':<4} {'Count':<8} Domain") print("-" * 40) for i, (domain, count) in enumerate(domain_counter.most_common(limit), 1): print(f"{i:<4} {count:<8} {domain}") def print_help(): print(""" AdGuard Home Log Analyzer ========================= Usage: python3 find_adguard_log.py [keyword1 keyword2 ...] [--top N] [--raw] [--unblocked-only] Positional arguments: One or more file patterns (e.g., querylog*.json or logs/*.gz) IP address to filter (e.g., 192.168.1.10) [keywords...] Optional keywords to match in domain queries (e.g., facebook netflix) Options: --top N Show top N most visited domains (default: off) --raw Do NOT consolidate subdomains (e.g., keep 'api.apple.com' separate from 'itunes.apple.com') --unblocked-only Show only queries that were NOT blocked by AdGuard --help, -h Show this help message Features: - Supports plain .json and .json.gz log files - Accepts wildcards (e.g., 'querylog*.json*') - Prints only final summary (not every matching line) - Automatically consolidates domains unless --raw is used Examples: python3 find_adguard_log.py logs/querylog*.json.gz 192.168.1.5 python3 find_adguard_log.py logs/querylog*.json* 172.16.0.25 facebook google python3 find_adguard_log.py querylog*.json.gz 192.168.1.8 --top 100 --unblocked-only python3 find_adguard_log.py *.json.gz 10.0.0.12 youtube --top 50 --raw """) def parse_arguments(argv): top_limit = None raw_mode = False unblocked_only = False show_help = False args = [] i = 0 while i < len(argv): if argv[i] in ('--help', '-h'): show_help = True i += 1 elif argv[i] == '--top': if i + 1 < len(argv) and argv[i + 1].isdigit(): top_limit = int(argv[i + 1]) i += 2 else: print("Error: --top must be followed by a number") sys.exit(1) elif argv[i] == '--raw': raw_mode = True i += 1 elif argv[i] == '--unblocked-only': unblocked_only = True i += 1 else: args.append(argv[i]) i += 1 return args, top_limit, raw_mode, unblocked_only, show_help def main(): raw_args = sys.argv[1:] args, top_limit, raw_mode, unblocked_only, show_help = parse_arguments(raw_args) if show_help or len(args) < 2: print_help() sys.exit(0 if show_help else 1) ip = None keywords = [] # Find IP in args (simple check for IPv4 format) for i, arg in enumerate(args): parts = arg.split('.') if len(parts) == 4 and all(p.isdigit() for p in parts): ip = arg file_patterns = args[:i] keywords = args[i+1:] break if not ip or not file_patterns: print("Error: Please provide at least one log file pattern and an IP address.") sys.exit(1) log_files = [] for pattern in file_patterns: matched = glob.glob(pattern) if not matched: print(f"Warning: No files matched pattern: {pattern}") log_files.extend(matched) if not log_files: print("Error: No matching log files found.") sys.exit(1) total_matches = 0 domain_counter = Counter() if top_limit else None for log_file in sorted(log_files): total_matches += process_file(log_file, ip, keywords, domain_counter, raw_mode, unblocked_only) print(f"\nTotal matched entries across all files: {total_matches}") if top_limit and total_matches > 0: print_top_domains(domain_counter, top_limit) if __name__ == "__main__": main()