Files
skrypty_narzedzia/adguard_log_parser.py
2025-06-15 20:14:47 +02:00

192 lines
6.6 KiB
Python

import json
import sys
import gzip
import glob
import tldextract
from collections import Counter
def matches(record, ip, keywords, unblocked_only):
if record.get("IP") != ip:
return False
if not keywords:
domain_match = True
else:
domain = record.get("QH", "").lower()
domain_match = any(keyword.lower() in domain for keyword in keywords)
if not domain_match:
return False
if unblocked_only:
result = record.get("Result")
if result and isinstance(result, dict) and len(result) > 0:
return False
return True
def process_file(filepath, ip, keywords, domain_counter=None, raw_mode=False, unblocked_only=False, collect_raw=False):
matched_count = 0
matched_records = [] if collect_raw else None
open_func = gzip.open if filepath.endswith(".gz") else open
try:
with open_func(filepath, "rt", encoding="utf-8", errors="ignore") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
if matches(record, ip, keywords, unblocked_only):
matched_count += 1
if collect_raw:
matched_records.append(record)
elif domain_counter is not None:
full_host = record.get("QH", "").lower()
if full_host:
if raw_mode:
domain = full_host
else:
extracted = tldextract.extract(full_host)
domain = f"{extracted.domain}.{extracted.suffix}"
if domain:
domain_counter[domain] += 1
except json.JSONDecodeError:
print(f"Warning: invalid JSON on line {line_num} in {filepath}")
continue
except FileNotFoundError:
print(f"Error: File not found: {filepath}")
except Exception as e:
print(f"Error while processing file {filepath}: {e}")
print(f"Matched {matched_count} entries in {filepath}.\n")
return matched_count, matched_records
def print_top_domains(domain_counter, limit):
print(f"\nTop {limit} visited domains:")
print(f"{'#':<4} {'Count':<8} Domain")
print("-" * 40)
for i, (domain, count) in enumerate(domain_counter.most_common(limit), 1):
print(f"{i:<4} {count:<8} {domain}")
def print_raw_records(records):
print("\nRaw matched entries:")
for rec in records:
t = rec.get("T", "")
ip = rec.get("IP", "")
qh = rec.get("QH", "")
print(f"{t} | IP: {ip} | Query: {qh}")
def print_help():
print("""
AdGuard Home Log Analyzer
=========================
Usage:
python3 find_adguard_log.py <log_pattern(s)> <IP> [keyword1 keyword2 ...] [--top N] [--raw] [--unblocked-only]
Positional arguments:
<log_pattern(s)> One or more file patterns (e.g., querylog*.json or logs/*.gz)
<IP> IP address to filter (e.g., 192.168.1.10)
[keywords...] Optional keywords to match in domain queries (e.g., facebook netflix)
Options:
--top N Show top N most visited domains (default: off)
--raw Show ALL matched entries (with timestamps and full domain), no domain consolidation, no top list
--unblocked-only Show only queries that were NOT blocked by AdGuard
--help, -h Show this help message
Features:
- Supports plain .json and .json.gz log files
- Accepts wildcards (e.g., 'querylog*.json*')
- Prints only final summary (not every matching line)
- Automatically consolidates domains unless --raw is used
Examples:
python3 find_adguard_log.py logs/querylog*.json.gz 192.168.1.5
python3 find_adguard_log.py logs/querylog*.json* 172.16.0.25 facebook google
python3 find_adguard_log.py querylog*.json.gz 192.168.1.8 --top 100 --unblocked-only
python3 find_adguard_log.py *.json.gz 10.0.0.12 youtube --top 50 --raw
python3 find_adguard_log.py *.json.gz 10.0.0.12 youtube --raw --unblocked-only
""")
def parse_arguments(argv):
top_limit = None
raw_mode = False
unblocked_only = False
show_help = False
args = []
i = 0
while i < len(argv):
if argv[i] in ('--help', '-h'):
show_help = True
i += 1
elif argv[i] == '--top':
if i + 1 < len(argv) and argv[i + 1].isdigit():
top_limit = int(argv[i + 1])
i += 2
else:
print("Error: --top must be followed by a number")
sys.exit(1)
elif argv[i] == '--raw':
raw_mode = True
i += 1
elif argv[i] == '--unblocked-only':
unblocked_only = True
i += 1
else:
args.append(argv[i])
i += 1
return args, top_limit, raw_mode, unblocked_only, show_help
def main():
raw_args = sys.argv[1:]
args, top_limit, raw_mode, unblocked_only, show_help = parse_arguments(raw_args)
if show_help or len(args) < 2:
print_help()
sys.exit(0 if show_help else 1)
ip = None
keywords = []
for i, arg in enumerate(args):
parts = arg.split('.')
if len(parts) == 4 and all(p.isdigit() for p in parts):
ip = arg
file_patterns = args[:i]
keywords = args[i+1:]
break
if not ip or not file_patterns:
print("Error: Please provide at least one log file pattern and an IP address.")
sys.exit(1)
log_files = []
for pattern in file_patterns:
matched = glob.glob(pattern)
if not matched:
print(f"Warning: No files matched pattern: {pattern}")
log_files.extend(matched)
if not log_files:
print("Error: No matching log files found.")
sys.exit(1)
total_matches = 0
domain_counter = Counter() if (top_limit and not raw_mode) else None
collected_records = [] if raw_mode else None
for log_file in sorted(log_files):
count, records = process_file(log_file, ip, keywords, domain_counter, raw_mode, unblocked_only, collect_raw=raw_mode)
total_matches += count
if raw_mode and records:
collected_records.extend(records)
print(f"\nTotal matched entries across all files: {total_matches}")
if raw_mode:
print_raw_records(collected_records)
elif top_limit and total_matches > 0:
print_top_domains(domain_counter, top_limit)
if __name__ == "__main__":
main()