#!/usr/bin/env python3 import argparse import mysql.connector import dns.resolver import datetime import csv import socket from dotenv import load_dotenv import os import sys from tqdm import tqdm import signal import logging import redis from tabulate import tabulate import xlsxwriter from collections import defaultdict import subprocess # Redis - baza 5 redis_client = redis.Redis(host='localhost', port=6379, db=5, decode_responses=True) # Tymczasowe domeny TEMP_DOMAINS = { "10minutemail.com", "tempmail.com", "tempmail.net", "tempmail.org", "guerrillamail.com", "mailinator.com", "discard.email", "fakeinbox.com", "trashmail.com", "getnada.com", "yopmail.com", "maildrop.cc", "sharklasers.com" } # Logi logging.basicConfig( filename='user_cleanup.log', level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def get_users(db_config): connection = mysql.connector.connect(**db_config) cursor = connection.cursor(dictionary=True) query = """ SELECT u.uid, u.name, u.mail, u.access, u.created, p.points, COUNT(n.nid) AS post_count FROM users u LEFT JOIN node n ON u.uid = n.uid LEFT JOIN userpoints p ON u.uid = p.uid WHERE u.uid > 0 GROUP BY u.uid """ cursor.execute(query) users = cursor.fetchall() cursor.close() connection.close() return users def is_fake_email(email): try: domain = email.split('@')[1] cache_key = f"mx:{domain}" cached = redis_client.get(cache_key) if cached is not None: return cached == "true" answers = dns.resolver.resolve(domain, 'MX', lifetime=5.0) result = "false" if answers else "true" except Exception: result = "true" redis_client.set(cache_key, result, ex=259200) return result == "true" def is_temp_email(email): try: domain = email.split('@')[1].lower() return domain in TEMP_DOMAINS except Exception: return False def export_to_csv(users): now = datetime.datetime.now().strftime("%Y-%m-%d_%H%M") filename = f"user_cleanup_results_{now}.csv" with open(filename, mode='w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(["UID", "Login", "E-mail", "Ostatnie logowanie", "Rejestracja", "Punkty", "Nieaktywny", "E-mail OK", "Tymczasowy"]) for u in users: writer.writerow([ u['uid'], u['name'], u['mail'], datetime.datetime.fromtimestamp(u['access']).strftime('%Y-%m-%d') if u['access'] else 'nigdy', datetime.datetime.fromtimestamp(u['created']).strftime('%Y-%m-%d') if u.get('created') else 'brak', u.get('points', 0), 'TAK' if u['inactive'] else 'NIE', 'TAK' if u['email_valid'] else 'NIE', 'TAK' if u['temp_email'] else 'NIE' ]) print(f"📁 CSV zapisany: {filename}") def export_to_excel(users): now = datetime.datetime.now().strftime("%Y-%m-%d_%H%M") filename = f"user_cleanup_results_{now}.xlsx" workbook = xlsxwriter.Workbook(filename) sheet = workbook.add_worksheet("Wyniki") headers = ["UID", "Login", "E-mail", "Ostatnie logowanie", "Rejestracja", "Punkty", "Nieaktywny", "E-mail OK", "Tymczasowy"] for col, header in enumerate(headers): sheet.write(0, col, header) for row_idx, u in enumerate(users, start=1): sheet.write(row_idx, 0, u['uid']) sheet.write(row_idx, 1, u['name']) sheet.write(row_idx, 2, u['mail']) sheet.write(row_idx, 3, datetime.datetime.fromtimestamp(u['access']).strftime('%Y-%m-%d') if u['access'] else 'nigdy') sheet.write(row_idx, 4, datetime.datetime.fromtimestamp(u['created']).strftime('%Y-%m-%d') if u.get('created') else 'brak') sheet.write(row_idx, 5, u.get('points', 0)) sheet.write(row_idx, 6, 'TAK' if u['inactive'] else 'NIE') sheet.write(row_idx, 7, 'TAK' if u['email_valid'] else 'NIE') sheet.write(row_idx, 8, 'TAK' if u['temp_email'] else 'NIE') workbook.close() print(f"📁 Excel zapisany: {filename}") def flush_redis_cache(): keys = redis_client.keys("mx:*") for key in keys: redis_client.delete(key) print(f"🧹 Redis MX cache wyczyszczony: {len(keys)} wpisów") def domain_report(users): domains = defaultdict(int) for u in users: domain = u['mail'].split('@')[1].lower() domains[domain] += 1 print("\n📊 Raport domen:") for domain, count in sorted(domains.items(), key=lambda x: x[1], reverse=True): print(f"- {domain}: {count} użytkowników") def delete_user_via_php(uid, drupal_path): try: result = subprocess.run( ['php', 'delete_user.php', str(uid), drupal_path], capture_output=True, text=True, check=True ) print(result.stdout.strip()) logging.info(f"PHP delete UID {uid}: {result.stdout.strip()}") except subprocess.CalledProcessError as e: logging.error(f"Błąd PHP delete UID {uid}: {e.stderr}") def confirm_delete(): answer = input("❗ Czy na pewno chcesz USUNĄĆ użytkowników? [tak/N]: ").strip().lower() if answer not in ("tak", "t", "yes", "y"): print("❌ Operacja anulowana.") sys.exit(0) def days_to_years(days): return round(days / 365, 1) def main(): signal.signal(signal.SIGINT, lambda s, f: sys.exit("\n🛑 Przerwano przez użytkownika.")) load_dotenv() parser = argparse.ArgumentParser( description="Drupal 6 user cleanup tool", epilog=""" Przykłady użycia: # Podgląd nieaktywnych użytkowników bez punktów /root/user_manager/venv/bin/python3 app.py --days-inactive 730 --dry-run # Usuń użytkowników z błędnymi e-mailami i nieaktywnych 2+ lata /root/user_manager/venv/bin/python3 app.py --days-inactive 730 --delete # Uwzględnij starych użytkowników, którzy logowali się ostatnio /root/user_manager/venv/bin/python3 app.py --days-inactive 730 --veteran-year 2012 --recent-login-days 1095 # Tylko walidacja adresów e-mail /root/user_manager/venv/bin/python3 app.py --validate # Czyszczenie cache DNS w Redisie /root/user_manager/venv/bin/python3 app.py --flush-cache """, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument('--host') parser.add_argument('--user') parser.add_argument('--password') parser.add_argument('--database') parser.add_argument('--days-inactive', type=int) parser.add_argument('--dry-run', action='store_true', default=None, help='Tryb podglądu (domyślny jeśli nie podano --delete)') parser.add_argument('--delete', action='store_true') parser.add_argument('--validate', action='store_true') parser.add_argument('--flush-cache', action='store_true') parser.add_argument('--export-excel', action='store_true') parser.add_argument('--report-domains', action='store_true') parser.add_argument('--veteran-year', type=int, default=2012, help='Minimalny rok rejestracji konta do uznania za stare (domyślnie: 2012)') parser.add_argument('--recent-login-days', type=int, default=1095, help='Ile dni wstecz ostatnie logowanie czyni konto aktywnym (domyślnie: 1095)') parser.add_argument('--show-table', action='store_true', help='Wyświetl tabelę z listą użytkowników do usunięcia') parser.add_argument('--drupal-path', help="Ścieżka do katalogu Drupala (można też podać przez .env jako DRUPAL_PATH)") args = parser.parse_args() if not args.drupal_path: args.drupal_path = os.getenv("DRUPAL_PATH") if args.delete is False and args.dry_run is None: args.dry_run = True if args.flush_cache: flush_redis_cache() return db_config = { 'host': args.host or os.getenv('DB_HOST'), 'user': args.user or os.getenv('DB_USER'), 'password': args.password or os.getenv('DB_PASSWORD'), 'database': args.database or os.getenv('DB_NAME') } users = get_users(db_config) now_ts = int(datetime.datetime.now().timestamp()) final_candidates = [] inactive_count = 0 invalid_email_count = 0 temp_email_count = 0 skipped_with_points = 0 skipped_veterans = 0 for user in tqdm(users, desc="Analiza"): if (user.get('points') or 0) > 0: skipped_with_points += 1 continue if (user.get('post_count') or 0) > 0: # Pomijamy użytkownika, który dodał treści continue # Pomijanie aktywnych "weteranów" created_year = datetime.datetime.fromtimestamp(user['created']).year if user.get('created') else None recent_login_threshold = now_ts - (args.recent_login_days * 86400) if created_year and created_year <= args.veteran_year: if user['access'] and user['access'] >= recent_login_threshold: skipped_veterans += 1 continue last_access = user['access'] or 0 user['inactive'] = (args.days_inactive is not None) and ((now_ts - last_access) > args.days_inactive * 86400) user['temp_email'] = is_temp_email(user['mail']) user['email_valid'] = not is_fake_email(user['mail']) and not user['temp_email'] if user['inactive']: inactive_count += 1 if not user['email_valid']: invalid_email_count += 1 if user['temp_email']: temp_email_count += 1 if args.validate or user['inactive'] or not user['email_valid']: final_candidates.append(user) # Redundant safety filter to exclude any with points final_candidates = [u for u in final_candidates if (u.get('points') or 0) == 0] if args.report_domains: domain_report(final_candidates) if args.show_table: print(tabulate([ [u['uid'], u['name'], u['mail'], datetime.datetime.fromtimestamp(u['access']).strftime('%Y-%m-%d') if u['access'] else 'nigdy', datetime.datetime.fromtimestamp(u['created']).strftime('%Y-%m-%d') if u.get('created') else 'brak', u.get('points', 0), 'TAK' if u['inactive'] else 'NIE', 'TAK' if u['email_valid'] else 'NIE', 'TAK' if u['temp_email'] else 'NIE'] for u in final_candidates ], headers=["UID", "Login", "E-mail", "Ostatnie log.", "Rejestracja", "Punkty", "Nieaktywny", "E-mail OK", "Tymczasowy"], tablefmt="fancy_grid")) export_to_csv(final_candidates) if args.export_excel: export_to_excel(final_candidates) print("\n📋 Parametry filtrowania:") if args.days_inactive: print(f"- Nieaktywni: brak logowania przez ≥ {args.days_inactive} dni (~{days_to_years(args.days_inactive)} lat)") print(f"- Weterani: konta zarejestrowane w roku ≤ {args.veteran_year}") print(f"- Pominięci weterani: logowanie w ciągu ostatnich ≤ {args.recent_login_days} dni (~{days_to_years(args.recent_login_days)} lat)") print("\n📊 Podsumowanie:") print(f"- Całkowita liczba użytkowników w bazie: {len(users)}") print(f"- Pominięci z punktami: {skipped_with_points}") print(f"- Nieaktywni (> {args.days_inactive} dni): {inactive_count}") print(f"- Z niepoprawnym e-mailem (MX lub tymczasowy): {invalid_email_count}") print(f"- Z tymczasowym e-mailem: {temp_email_count}") print(f"- Kandydaci do usunięcia: {len(final_candidates)}") print(f"- Pominięci jako aktywni weterani: {skipped_veterans}") if args.delete: confirm_delete() if not args.drupal_path: print("❌ Brak parametru --drupal-path. Nie można usunąć użytkowników bez ścieżki do Drupala.") sys.exit(1) for u in tqdm(final_candidates, desc="Usuwanie użytkowników"): delete_user_via_php(u['uid'], args.drupal_path) print(f"✅ Usunięto {len(final_candidates)} użytkowników przez delete_user.php") if __name__ == '__main__': main()