#!/usr/bin/env python3 import argparse import mysql.connector import dns.resolver import datetime import csv import socket from dotenv import load_dotenv import os import sys from tqdm import tqdm import signal import logging import redis from tabulate import tabulate import xlsxwriter from collections import defaultdict import subprocess import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import time # Redis - baza 5 redis_client = redis.Redis(host='localhost', port=6379, db=5, decode_responses=True) # Tymczasowe domeny TEMP_DOMAINS = { "10minutemail.com", "tempmail.com", "tempmail.net", "tempmail.org", "guerrillamail.com", "mailinator.com", "discard.email", "fakeinbox.com", "trashmail.com", "getnada.com", "yopmail.com", "maildrop.cc", "sharklasers.com" } # Logi logging.basicConfig( filename='user_cleanup.log', level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def get_users(db_config): connection = mysql.connector.connect(**db_config) cursor = connection.cursor(dictionary=True) query = """ SELECT u.uid, u.name, u.mail, u.access, u.created, p.points, COUNT(n.nid) AS post_count FROM users u LEFT JOIN node n ON u.uid = n.uid LEFT JOIN userpoints p ON u.uid = p.uid WHERE u.uid > 0 GROUP BY u.uid """ cursor.execute(query) users = cursor.fetchall() cursor.close() connection.close() return users def is_fake_email(email): try: domain = email.split('@')[1] cache_key = f"mx:{domain}" cached = redis_client.get(cache_key) if cached is not None: return cached == "true" answers = dns.resolver.resolve(domain, 'MX', lifetime=5.0) result = "false" if answers else "true" except Exception: result = "true" redis_client.set(cache_key, result, ex=259200) return result == "true" def is_temp_email(email): try: domain = email.split('@')[1].lower() return domain in TEMP_DOMAINS except Exception: return False def export_to_csv(users): now = datetime.datetime.now().strftime("%Y-%m-%d_%H%M") filename = f"user_cleanup_results_{now}.csv" with open(filename, mode='w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(["UID", "Login", "E-mail", "Ostatnie logowanie", "Rejestracja", "Punkty", "Nieaktywny", "E-mail OK", "Tymczasowy"]) for u in users: writer.writerow([ u['uid'], u['name'], u['mail'], datetime.datetime.fromtimestamp(u['access']).strftime('%Y-%m-%d') if u['access'] else 'nigdy', datetime.datetime.fromtimestamp(u['created']).strftime('%Y-%m-%d') if u.get('created') else 'brak', u.get('points', 0), 'TAK' if u['inactive'] else 'NIE', 'TAK' if u['email_valid'] else 'NIE', 'TAK' if u['temp_email'] else 'NIE' ]) print(f"📁 CSV zapisany: {filename}") def export_to_excel(users): now = datetime.datetime.now().strftime("%Y-%m-%d_%H%M") filename = f"user_cleanup_results_{now}.xlsx" workbook = xlsxwriter.Workbook(filename) sheet = workbook.add_worksheet("Wyniki") headers = ["UID", "Login", "E-mail", "Ostatnie logowanie", "Rejestracja", "Punkty", "Nieaktywny", "E-mail OK", "Tymczasowy"] for col, header in enumerate(headers): sheet.write(0, col, header) for row_idx, u in enumerate(users, start=1): sheet.write(row_idx, 0, u['uid']) sheet.write(row_idx, 1, u['name']) sheet.write(row_idx, 2, u['mail']) sheet.write(row_idx, 3, datetime.datetime.fromtimestamp(u['access']).strftime('%Y-%m-%d') if u['access'] else 'nigdy') sheet.write(row_idx, 4, datetime.datetime.fromtimestamp(u['created']).strftime('%Y-%m-%d') if u.get('created') else 'brak') sheet.write(row_idx, 5, u.get('points', 0)) sheet.write(row_idx, 6, 'TAK' if u['inactive'] else 'NIE') sheet.write(row_idx, 7, 'TAK' if u['email_valid'] else 'NIE') sheet.write(row_idx, 8, 'TAK' if u['temp_email'] else 'NIE') workbook.close() print(f"📁 Excel zapisany: {filename}") def flush_redis_cache(): keys = redis_client.keys("mx:*") for key in keys: redis_client.delete(key) print(f"🧹 Redis MX cache wyczyszczony: {len(keys)} wpisów") def domain_report(users): domains = defaultdict(int) for u in users: domain = u['mail'].split('@')[1].lower() domains[domain] += 1 print("\n📊 Raport domen:") for domain, count in sorted(domains.items(), key=lambda x: x[1], reverse=True): print(f"- {domain}: {count} użytkowników") def delete_user_via_php(uid, drupal_path): try: result = subprocess.run( ['php', 'delete_user.php', str(uid), drupal_path], capture_output=True, text=True, check=True ) print(result.stdout.strip()) logging.info(f"PHP delete UID {uid}: {result.stdout.strip()}") except subprocess.CalledProcessError as e: logging.error(f"Błąd PHP delete UID {uid}: {e.stderr}") def confirm_delete(): answer = input("❗ Czy na pewno chcesz USUNĄĆ użytkowników? [tak/N]: ").strip().lower() if answer not in ("tak", "t", "yes", "y"): print("❌ Operacja anulowana.") sys.exit(0) def days_to_years(days): return round(days / 365, 1) def get_smtp_config(): config = { "host": os.getenv("SMTP_HOST"), "port": os.getenv("SMTP_PORT"), "user": os.getenv("SMTP_USER"), "password": os.getenv("SMTP_PASSWORD") } if not config["host"] or not config["port"]: raise ValueError("❌ Brakuje SMTP_HOST lub SMTP_PORT w .env") return config def send_email_batch(users, smtp_config, mails_per_pack=100, time_per_pack=60, dry_run=False): import os template_path = "mail_template.html" if not os.path.exists(template_path): print(f"❌ Brak pliku szablonu: {template_path}") return try: with open(template_path, "r", encoding="utf-8") as f: template = f.read() except Exception as e: print(f"❌ Błąd podczas odczytu szablonu HTML: {e}") return try: smtp = smtplib.SMTP(smtp_config["host"], int(smtp_config["port"])) smtp.ehlo() try: smtp.starttls() smtp.ehlo() except Exception: pass if smtp_config["user"] and smtp_config["password"]: smtp.login(smtp_config["user"], smtp_config["password"]) except Exception as e: print(f"❌ Błąd połączenia z serwerem SMTP: {e}") return print(f"📨 Rozpoczynam wysyłkę {len(users)} maili...") for i in tqdm(range(0, len(users), mails_per_pack), desc="Wysyłanie emaili"): batch = users[i:i + mails_per_pack] for user in batch: if dry_run: print(f"🔔 [TRYB TESTOWY] Mail do: {user['mail']}") continue try: msg = MIMEMultipart() from_email = smtp_config["user"] or "unitra@unitraklub.pl" msg['From'] = f"Unitra-Klub <{from_email}>" msg['To'] = user['mail'] msg['Subject'] = "Twoje konto w unitraklub.pl" body = template.replace("@user", user['name']).replace( "@rejestracja", datetime.datetime.fromtimestamp(user['created']).strftime('%Y-%m-%d')) msg.attach(MIMEText(body, 'html')) smtp.send_message(msg) time.sleep(0.5) # opcjonalne mikroopóźnienie między mailami except Exception as e: print(f"⚠️ Błąd wysyłki do {user['mail']}: {e}") if i + mails_per_pack < len(users): print(f"⏸ Przerwa {time_per_pack} sekund między paczkami...") time.sleep(time_per_pack) smtp.quit() print("✅ Wysyłka zakończona.") def validate_smtp_config(config): required = ['host', 'port', 'user', 'password'] for key in required: if not config.get(key): raise ValueError(f"❌ Brakuje wartości SMTP dla: {key}") def main(): signal.signal(signal.SIGINT, lambda s, f: sys.exit("\n🛑 Przerwano przez użytkownika.")) load_dotenv() try: smtp_config = get_smtp_config() except ValueError as e: print(str(e)) sys.exit(1) parser = argparse.ArgumentParser( description="Drupal 6 user cleanup tool", epilog=""" Przykłady użycia: # Podgląd nieaktywnych użytkowników bez punktów /root/user_manager/venv/bin/python3 app.py --days-inactive 730 --dry-run # Usuń użytkowników z błędnymi e-mailami i nieaktywnych 2+ lata /root/user_manager/venv/bin/python3 app.py --days-inactive 730 --delete # Uwzględnij starych użytkowników, którzy logowali się ostatnio /root/user_manager/venv/bin/python3 app.py --days-inactive 730 --veteran-year 2012 --recent-login-days 1095 # Tylko walidacja adresów e-mail /root/user_manager/venv/bin/python3 app.py --validate # Czyszczenie cache DNS w Redisie /root/user_manager/venv/bin/python3 app.py --flush-cache """, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument('--host', help='Adres hosta bazy danych (można ustawić w .env jako DB_HOST)') parser.add_argument('--user', help='Użytkownik bazy danych (lub DB_USER z .env)') parser.add_argument('--password', help='Hasło do bazy danych (lub DB_PASSWORD z .env)') parser.add_argument('--database', help='Nazwa bazy danych (lub DB_NAME z .env)') parser.add_argument('--days-inactive', type=int, help='Minimalna liczba dni nieaktywności, po której użytkownik uznawany jest za nieaktywny') parser.add_argument('--dry-run', action='store_true', default=None, help='Tryb podglądu: nie wykonuje żadnych zmian, tylko raportuje') parser.add_argument('--delete', action='store_true', help='Usuń użytkowników, którzy spełniają kryteria filtrowania') parser.add_argument('--validate', action='store_true', help='Tylko sprawdź poprawność adresów e-mail (bez usuwania)') parser.add_argument('--flush-cache', action='store_true', help='Wyczyść cache rekordów MX w Redisie') parser.add_argument('--export-excel', action='store_true', help='Zapisz wynik filtrowania użytkowników także do pliku Excel (.xlsx)') parser.add_argument('--report-domains', action='store_true', help='Wygeneruj raport ilości użytkowników według domen e-mail') parser.add_argument('--veteran-year', type=int, default=2012, help='Rok, przed którym konto uznawane jest za stare/weterana (domyślnie: 2012)') parser.add_argument('--recent-login-days', type=int, default=1095, help='Ile dni wstecz uznaje się zalogowanego weterana za aktywnego (domyślnie: 1095)') parser.add_argument('--show-table', action='store_true', help='Wyświetl tabelę użytkowników spełniających kryteria do usunięcia') parser.add_argument('--drupal-path', help='Ścieżka do katalogu Drupala (można ustawić przez .env jako DRUPAL_PATH)') parser.add_argument('--send-test', metavar="EMAIL", help='Wyślij testowego maila z szablonu na wskazany adres e-mail') parser.add_argument('--send-mails', action='store_true', help='Wyślij powiadomienia e-mail do użytkowników z listy kandydatów') parser.add_argument('--mails-per-pack', type=int, default=100, help='Ile e-maili wysyłać w jednej paczce (domyślnie: 100)') parser.add_argument('--time-per-pack', type=int, default=60, help='Ile sekund czekać między paczkami maili (domyślnie: 60 sek.)') parser.add_argument('--only-invalid-emails', action='store_true', help='Usuń tylko użytkowników z nieprawidłowymi lub tymczasowymi adresami e-mail (bez sprawdzania aktywności)') args = parser.parse_args() if args.send_test: test_user = { 'name': 'Testowy Użytkownik', 'mail': args.send_test, 'created': int(datetime.datetime.now().timestamp()) - (86400 * 365 * 2) # 2 lata temu } print(f"📬 Wysyłka testowego maila na: {test_user['mail']}") send_email_batch([test_user], smtp_config, mails_per_pack=1, time_per_pack=0, dry_run=False) return if not args.drupal_path: args.drupal_path = os.getenv("DRUPAL_PATH") if args.delete is False and args.dry_run is None: args.dry_run = True if args.flush_cache: flush_redis_cache() return db_config = { 'host': args.host or os.getenv('DB_HOST'), 'user': args.user or os.getenv('DB_USER'), 'password': args.password or os.getenv('DB_PASSWORD'), 'database': args.database or os.getenv('DB_NAME') } users = get_users(db_config) now_ts = int(datetime.datetime.now().timestamp()) final_candidates = [] inactive_count = 0 invalid_email_count = 0 temp_email_count = 0 skipped_with_points = 0 skipped_veterans = 0 for user in tqdm(users, desc="Analiza"): if (user.get('points') or 0) > 0: skipped_with_points += 1 continue if (user.get('post_count') or 0) > 0: # Pomijamy użytkownika, który dodał treści continue user['temp_email'] = is_temp_email(user['mail']) user['email_valid'] = not is_fake_email(user['mail']) and not user['temp_email'] if args.only_invalid_emails: if not user['email_valid']: user['inactive'] = True final_candidates.append(user) continue # Pomijanie aktywnych "weteranów" created_year = datetime.datetime.fromtimestamp(user['created']).year if user.get('created') else None recent_login_threshold = now_ts - (args.recent_login_days * 86400) if created_year and created_year <= args.veteran_year: if user['access'] and user['access'] >= recent_login_threshold: skipped_veterans += 1 continue last_access = user['access'] or 0 user['inactive'] = (args.days_inactive is not None) and ((now_ts - last_access) > args.days_inactive * 86400) user['temp_email'] = is_temp_email(user['mail']) user['email_valid'] = not is_fake_email(user['mail']) and not user['temp_email'] if user['inactive']: inactive_count += 1 if not user['email_valid']: invalid_email_count += 1 if user['temp_email']: temp_email_count += 1 if args.validate or user['inactive'] or not user['email_valid']: final_candidates.append(user) # Redundant safety filter to exclude any with points final_candidates = [u for u in final_candidates if (u.get('points') or 0) == 0] if args.report_domains: domain_report(final_candidates) if args.show_table: print(tabulate([ [u['uid'], u['name'], u['mail'], datetime.datetime.fromtimestamp(u['access']).strftime('%Y-%m-%d') if u['access'] else 'nigdy', datetime.datetime.fromtimestamp(u['created']).strftime('%Y-%m-%d') if u.get('created') else 'brak', u.get('points', 0), 'TAK' if u['inactive'] else 'NIE', 'TAK' if u['email_valid'] else 'NIE', 'TAK' if u['temp_email'] else 'NIE'] for u in final_candidates ], headers=["UID", "Login", "E-mail", "Ostatnie log.", "Rejestracja", "Punkty", "Nieaktywny", "E-mail OK", "Tymczasowy"], tablefmt="fancy_grid")) export_to_csv(final_candidates) if args.export_excel: export_to_excel(final_candidates) if args.send_mails: send_email_batch(final_candidates, smtp_config, args.mails_per_pack, args.time_per_pack) print("\n📋 Parametry filtrowania:") if args.days_inactive: print(f"- Nieaktywni: brak logowania przez ≥ {args.days_inactive} dni (~{days_to_years(args.days_inactive)} lat)") print(f"- Weterani: konta zarejestrowane w roku ≤ {args.veteran_year}") print(f"- Pominięci weterani: logowanie w ciągu ostatnich ≤ {args.recent_login_days} dni (~{days_to_years(args.recent_login_days)} lat)") print("\n📊 Podsumowanie:") print(f"- Całkowita liczba użytkowników w bazie: {len(users)}") print(f"- Pominięci z punktami: {skipped_with_points}") print(f"- Nieaktywni (> {args.days_inactive} dni): {inactive_count}") print(f"- Z niepoprawnym e-mailem (MX lub tymczasowy): {invalid_email_count}") print(f"- Z tymczasowym e-mailem: {temp_email_count}") print(f"- Kandydaci do usunięcia: {len(final_candidates)}") print(f"- Pominięci jako aktywni weterani: {skipped_veterans}") if args.only_invalid_emails: print("- Tryb: tylko użytkownicy z nieprawidłowymi adresami e-mail") if args.delete: confirm_delete() if not args.drupal_path: print("❌ Brak parametru --drupal-path. Nie można usunąć użytkowników bez ścieżki do Drupala.") sys.exit(1) for u in tqdm(final_candidates, desc="Usuwanie użytkowników"): delete_user_via_php(u['uid'], args.drupal_path) print(f"✅ Usunięto {len(final_candidates)} użytkowników przez delete_user.php") if __name__ == '__main__': main()