Add adguard_log_parser.py
This commit is contained in:
161
adguard_log_parser.py
Normal file
161
adguard_log_parser.py
Normal file
@ -0,0 +1,161 @@
|
||||
import json
|
||||
import sys
|
||||
import gzip
|
||||
import glob
|
||||
import tldextract
|
||||
from collections import Counter
|
||||
|
||||
def matches(record, ip, keywords):
|
||||
if record.get("IP") != ip:
|
||||
return False
|
||||
if not keywords:
|
||||
return True
|
||||
domain = record.get("QH", "").lower()
|
||||
return any(keyword.lower() in domain for keyword in keywords)
|
||||
|
||||
def process_file(filepath, ip, keywords, domain_counter=None, raw_mode=False):
|
||||
matched_count = 0
|
||||
open_func = gzip.open if filepath.endswith(".gz") else open
|
||||
|
||||
try:
|
||||
with open_func(filepath, "rt", encoding="utf-8", errors="ignore") as f:
|
||||
for line_num, line in enumerate(f, 1):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
record = json.loads(line)
|
||||
if matches(record, ip, keywords):
|
||||
matched_count += 1
|
||||
if domain_counter is not None:
|
||||
full_host = record.get("QH", "").lower()
|
||||
if full_host:
|
||||
if raw_mode:
|
||||
domain = full_host
|
||||
else:
|
||||
extracted = tldextract.extract(full_host)
|
||||
domain = f"{extracted.domain}.{extracted.suffix}"
|
||||
if domain:
|
||||
domain_counter[domain] += 1
|
||||
except json.JSONDecodeError:
|
||||
print(f"Warning: invalid JSON on line {line_num} in {filepath}")
|
||||
continue
|
||||
except FileNotFoundError:
|
||||
print(f"Error: File not found: {filepath}")
|
||||
except Exception as e:
|
||||
print(f"Error while processing file {filepath}: {e}")
|
||||
|
||||
print(f"Matched {matched_count} entries in {filepath}.\n")
|
||||
return matched_count
|
||||
|
||||
def print_top_domains(domain_counter, limit):
|
||||
print(f"\nTop {limit} visited domains:")
|
||||
print(f"{'#':<4} {'Count':<8} Domain")
|
||||
print("-" * 40)
|
||||
for i, (domain, count) in enumerate(domain_counter.most_common(limit), 1):
|
||||
print(f"{i:<4} {count:<8} {domain}")
|
||||
|
||||
def print_help():
|
||||
print("""
|
||||
AdGuard Home Log Analyzer
|
||||
=========================
|
||||
|
||||
Usage:
|
||||
python3 find_adguard_log.py <log_pattern(s)> <IP> [keyword1 keyword2 ...] [--top N] [--raw]
|
||||
|
||||
Positional arguments:
|
||||
<log_pattern(s)> One or more file patterns (e.g., querylog*.json or logs/*.gz)
|
||||
<IP> IP address to filter (e.g., 192.168.1.10)
|
||||
[keywords...] Optional keywords to match in domain queries (e.g., facebook netflix)
|
||||
|
||||
Options:
|
||||
--top N Show top N most visited domains (default: off)
|
||||
--raw Do NOT consolidate subdomains (e.g., keep 'api.apple.com' separate from 'itunes.apple.com')
|
||||
--help, -h Show this help message
|
||||
|
||||
Features:
|
||||
- Supports plain .json and .json.gz log files
|
||||
- Accepts wildcards (e.g., 'querylog*.json*')
|
||||
- Prints only final summary (not every matching line)
|
||||
- Automatically consolidates domains unless --raw is used
|
||||
|
||||
Examples:
|
||||
python3 find_adguard_log.py logs/querylog*.json.gz 192.168.1.5
|
||||
python3 find_adguard_log.py logs/querylog*.json* 172.16.0.25 facebook google
|
||||
python3 find_adguard_log.py querylog*.json.gz 192.168.1.8 --top 100
|
||||
python3 find_adguard_log.py *.json.gz 10.0.0.12 youtube --top 50 --raw
|
||||
""")
|
||||
|
||||
def parse_arguments(argv):
|
||||
top_limit = None
|
||||
raw_mode = False
|
||||
show_help = False
|
||||
args = []
|
||||
i = 0
|
||||
while i < len(argv):
|
||||
if argv[i] in ('--help', '-h'):
|
||||
show_help = True
|
||||
i += 1
|
||||
elif argv[i] == '--top':
|
||||
if i + 1 < len(argv) and argv[i + 1].isdigit():
|
||||
top_limit = int(argv[i + 1])
|
||||
i += 2
|
||||
else:
|
||||
print("Error: --top must be followed by a number")
|
||||
sys.exit(1)
|
||||
elif argv[i] == '--raw':
|
||||
raw_mode = True
|
||||
i += 1
|
||||
else:
|
||||
args.append(argv[i])
|
||||
i += 1
|
||||
return args, top_limit, raw_mode, show_help
|
||||
|
||||
def main():
|
||||
raw_args = sys.argv[1:]
|
||||
args, top_limit, raw_mode, show_help = parse_arguments(raw_args)
|
||||
|
||||
if show_help or len(args) < 2:
|
||||
print_help()
|
||||
sys.exit(0 if show_help else 1)
|
||||
|
||||
ip = None
|
||||
keywords = []
|
||||
|
||||
# Find IP in args (simple check for IPv4 format)
|
||||
for i, arg in enumerate(args):
|
||||
parts = arg.split('.')
|
||||
if len(parts) == 4 and all(p.isdigit() for p in parts):
|
||||
ip = arg
|
||||
file_patterns = args[:i]
|
||||
keywords = args[i+1:]
|
||||
break
|
||||
|
||||
if not ip or not file_patterns:
|
||||
print("Error: Please provide at least one log file pattern and an IP address.")
|
||||
sys.exit(1)
|
||||
|
||||
log_files = []
|
||||
for pattern in file_patterns:
|
||||
matched = glob.glob(pattern)
|
||||
if not matched:
|
||||
print(f"Warning: No files matched pattern: {pattern}")
|
||||
log_files.extend(matched)
|
||||
|
||||
if not log_files:
|
||||
print("Error: No matching log files found.")
|
||||
sys.exit(1)
|
||||
|
||||
total_matches = 0
|
||||
domain_counter = Counter() if top_limit else None
|
||||
|
||||
for log_file in sorted(log_files):
|
||||
total_matches += process_file(log_file, ip, keywords, domain_counter, raw_mode)
|
||||
|
||||
print(f"\nTotal matched entries across all files: {total_matches}")
|
||||
|
||||
if top_limit and total_matches > 0:
|
||||
print_top_domains(domain_counter, top_limit)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user