Files
2025-05-24 23:00:30 +02:00

363 lines
13 KiB
Python

import os
import argparse
import pymysql
import hashlib
from PIL import Image
from config import DB_CONFIG, FILES_BASE_PATH
MAX_WIDTH = 1920
MAX_HEIGHT = 1080
MAX_FILESIZE_BYTES = 1 * 1024 * 1024 # 1MB
def get_connection():
return pymysql.connect(
host=DB_CONFIG['host'],
user=DB_CONFIG['user'],
password=DB_CONFIG['password'],
database=DB_CONFIG['database'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def get_image_resolution(path):
try:
with Image.open(path) as img:
return img.size # (width, height)
except Exception:
return (0, 0)
def hash_file(path):
hasher = hashlib.md5()
try:
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b""):
hasher.update(chunk)
return hasher.hexdigest()
except Exception:
return None
def resize_image(path):
try:
with Image.open(path) as img:
width, height = img.size
if width <= MAX_WIDTH and height <= MAX_HEIGHT and os.path.getsize(path) <= MAX_FILESIZE_BYTES:
return False # Nie trzeba zmieniać
# Oblicz skalę proporcjonalnie, by zmieścić w max rozmiar i rozdzielczość
width_ratio = MAX_WIDTH / width
height_ratio = MAX_HEIGHT / height
scale_ratio = min(width_ratio, height_ratio, 1)
new_width = int(width * scale_ratio)
new_height = int(height * scale_ratio)
img = img.resize((new_width, new_height), Image.LANCZOS)
# Zapisz nadpisując, jakość 85%
img.save(path, optimize=True, quality=85)
return True
except Exception as e:
print(f"[ERROR] Nie udało się zmniejszyć {path}: {e}")
return False
def fetch_used_filepaths(cursor):
"""
Pobiera unikalne ścieżki plików (filepath) ze wszystkich tabel, które mają kolumnę 'filepath'.
Zwraca set ścieżek względnych (z '/' zamiast '\\').
"""
# Pobierz listę tabel w bazie
cursor.execute("SHOW TABLES")
tables = [row[f'Tables_in_{DB_CONFIG["database"]}'] for row in cursor.fetchall()]
filepaths = set()
for table in tables:
# Sprawdź kolumny danej tabeli
cursor.execute(f"SHOW COLUMNS FROM `{table}` LIKE 'filepath'")
if cursor.rowcount == 1:
# Tabela ma kolumnę 'filepath' — pobierz jej wartości
try:
cursor.execute(f"SELECT filepath FROM `{table}`")
rows = cursor.fetchall()
for row in rows:
path = row['filepath'].replace('\\', '/')
filepaths.add(path)
except Exception as e:
print(f"[WARN] Nie udało się pobrać danych z tabeli '{table}': {e}")
# Możesz tu też zdecydować, czy chcesz przerwać, czy tylko ostrzec
return filepaths
def file_exists_on_disk(base_path, rel_path):
# Normalizuj ścieżkę z bazy
rel_path_norm = rel_path.lstrip("/\\") # usuń początkowe ukośniki
full_path = os.path.normpath(os.path.join(base_path, rel_path_norm))
# Debug print, żeby zweryfikować jak łączy się ścieżka
# print(f"Sprawdzam plik: {full_path}")
return os.path.isfile(full_path), full_path
def compare_files(
debug=False,
to_optimize=False,
update_db=False,
dry_run=False,
extensions='jpg,jpeg,png,gif',
exclude_folders=None,
find_duplicates=False,
show_deleted=False,
top=20,
):
allowed_exts = tuple(f".{ext.strip().lower()}" for ext in extensions.split(','))
exclude_folders = exclude_folders or []
exclude_folders = [folder.rstrip('/\\') for folder in exclude_folders]
connection = get_connection()
cursor = connection.cursor()
# Pobierz pliki z bazy
cursor.execute("SELECT fid, filepath, filesize FROM files")
files = cursor.fetchall()
# Zbiór ścieżek plików używanych w systemie (dla duplikatów i wykrywania osieroconych)
used_filepaths = fetch_used_filepaths(cursor)
stats = {
'total': 0,
'missing': 0,
'mismatched': 0,
'saved_bytes_potential': 0,
'largest': [],
'highest_res': [],
'to_optimize': [],
'duplicates': [],
'deleted': [],
}
images_info = []
hash_map = {}
# Przetwarzanie plików z bazy
for file in files:
rel_path = file['filepath'].replace('\\', '/')
# Sprawdź wykluczenia folderów
if any(rel_path.startswith(excl + '/') or rel_path == excl for excl in exclude_folders):
if debug:
print(f"[POMINIĘTO] {rel_path} (folder wykluczony)")
continue
if not rel_path.lower().endswith(allowed_exts):
continue
fid = file['fid']
db_size = file['filesize']
full_path = os.path.join(FILES_BASE_PATH, rel_path)
stats['total'] += 1
if not os.path.isfile(full_path):
stats['missing'] += 1
if debug:
print(f"[BRAK] {rel_path}")
print(f"{full_path}")
continue
actual_size = os.path.getsize(full_path)
width, height = get_image_resolution(full_path)
file_hash = hash_file(full_path)
images_info.append({
'fid': fid,
'path': full_path,
'rel_path': rel_path,
'db_size': db_size,
'actual_size': actual_size,
'width': width,
'height': height,
'hash': file_hash,
})
# Rozmiar różniący się
if actual_size != db_size:
stats['mismatched'] += 1
diff = db_size - actual_size
if diff > 0:
stats['saved_bytes_potential'] += diff
if debug:
print(f"[ROZMIAR] {rel_path}")
print(f" → DB: {db_size} B, Dysk: {actual_size} B ({(db_size - actual_size)/1024:.1f} KB różnicy)")
if update_db and not dry_run:
update_sql = "UPDATE files SET filesize = %s WHERE fid = %s"
cursor.execute(update_sql, (actual_size, fid))
connection.commit()
elif update_db and dry_run:
delta_kb = (actual_size - db_size) / 1024
print(f"[DRY-RUN] {rel_path}")
print(f" DB: {db_size} B | FS: {actual_size} B | Δ: {delta_kb:+.1f} KB")
# Optymalizacja (np. zmniejszanie obrazów)
if to_optimize:
if (width > MAX_WIDTH or height > MAX_HEIGHT) or actual_size > MAX_FILESIZE_BYTES:
stats['to_optimize'].append({
'path': full_path,
'size': actual_size,
'res': f"{width}x{height}"
})
if update_db and not dry_run:
resized = resize_image(full_path)
if resized:
new_size = os.path.getsize(full_path)
# Aktualizuj bazę po zmianie
update_sql = "UPDATE files SET filesize = %s WHERE fid = %s"
cursor.execute(update_sql, (new_size, fid))
connection.commit()
if debug:
print(f"[ZMIENIONO] {rel_path}: zmniejszono do {new_size} B")
elif update_db and dry_run:
print(f"[DRY-RUN] Zmniejszyłbym {rel_path}")
# Mapowanie hashy do detekcji duplikatów
if find_duplicates and file_hash:
if file_hash not in hash_map:
hash_map[file_hash] = []
hash_map[file_hash].append({
'fid': fid,
'rel_path': rel_path,
'path': full_path,
'db_size': db_size,
'actual_size': actual_size,
'width': width,
'height': height,
})
# Największe pliki
stats['largest'] = sorted(images_info, key=lambda x: x['actual_size'], reverse=True)[:top]
# Najwyższa rozdzielczość
stats['highest_res'] = sorted(images_info, key=lambda x: x['width'] * x['height'], reverse=True)[:top]
# Szukanie duplikatów
if find_duplicates:
duplicates = []
for file_hash, files_list in hash_map.items():
if len(files_list) > 1:
# Sprawdź które pliki są używane w systemie
used = []
orphaned = []
for f in files_list:
if f['rel_path'] in used_filepaths:
used.append(f)
else:
orphaned.append(f)
duplicates.append({
'hash': file_hash,
'used': used,
'orphaned': orphaned,
})
stats['duplicates'] = duplicates
# Pokazywanie plików na dysku, których brak w bazie (usunięte w bazie)
if show_deleted:
disk_files = []
for root, _, files in os.walk(FILES_BASE_PATH):
# Sprawdź czy folder jest wykluczony
rel_root = os.path.relpath(root, FILES_BASE_PATH).replace('\\','/')
if any(rel_root.startswith(excl) for excl in exclude_folders):
continue
for f in files:
if not f.lower().endswith(allowed_exts):
continue
full_path = os.path.join(root, f)
rel_path = os.path.relpath(full_path, FILES_BASE_PATH).replace('\\', '/')
if rel_path not in used_filepaths:
disk_files.append(rel_path)
stats['deleted'] = disk_files
connection.close()
return stats
def print_summary(stats, to_optimize=False, find_duplicates=False, show_deleted=False, top=20):
print(f"\n📊 Podsumowanie analizy:")
print(f" 🔢 Liczba plików: {stats['total']}")
print(f" ❌ Brakujące pliki: {stats['missing']}")
print(f" 🛠 Rozmiary różniące się: {stats['mismatched']}")
print(f" 💾 Potencjalne oszczędności po zmniejszeniu: {stats['saved_bytes_potential'] / (1024**2):.2f} MB")
print(f"\n🖼 Największe obrazy (top {top}):")
for img in stats['largest']:
size_kb = img['actual_size'] / 1024
size_mb = size_kb / 1024
print(f" - {img['path']} ({size_kb:.1f} KB / {size_mb:.2f} MB)")
print(f"\n📐 Najwyższe rozdzielczości (top {top}):")
for img in stats['highest_res']:
print(f" - {img['path']} ({img['width']}x{img['height']})")
if to_optimize and stats['to_optimize']:
print(f"\n⚡ Pliki do optymalizacji ({len(stats['to_optimize'])}):")
for opt in stats['to_optimize']:
size_kb = opt['size'] / 1024
print(f" - {opt['path']} ({opt['res']}, {size_kb:.1f} KB)")
if find_duplicates and stats['duplicates']:
print(f"\n🔍 Duplikaty ({len(stats['duplicates'])} grup):")
for group in stats['duplicates']:
print(f" Hash: {group['hash']}")
if group['used']:
print(" ▶ Używane w systemie:")
for f in group['used']:
print(f" - {f['rel_path']}")
if group['orphaned']:
print(" ⚠️ Osierocone (nieużywane):")
for f in group['orphaned']:
print(f" - {f['rel_path']}")
if show_deleted and stats['deleted']:
print(f"\n❌ Pliki na dysku nieznalezione w bazie (usunięte?):")
for f in stats['deleted']:
print(f" - {f}")
def main():
parser = argparse.ArgumentParser(description="Analiza plików w Drupal 6")
parser.add_argument('--debug', action='store_true', help='Tryb debugowania')
parser.add_argument('--update-db', action='store_true', help='Aktualizuj rozmiary plików w bazie')
parser.add_argument('--dry-run', action='store_true', help='Symuluj zmiany bez zapisywania')
parser.add_argument('--extensions', default='jpg,jpeg,png,gif', help='Rozszerzenia plików do analizy')
parser.add_argument('--exclude-folders', nargs='*', default=[], help='Foldery do wykluczenia (względem files/)')
parser.add_argument('--find-duplicates', action='store_true', help='Znajdź i pokaż duplikaty')
parser.add_argument('--show-deleted', action='store_true', help='Pokaż pliki na dysku usunięte z bazy')
parser.add_argument('--optimize', action='store_true', help='Zmniejsz obrazy powyżej limitów')
parser.add_argument('--top', type=int, default=20, help='Ilość największych i najwyższych do pokazania')
args = parser.parse_args()
stats = compare_files(
debug=args.debug,
to_optimize=args.optimize,
update_db=args.update_db,
dry_run=args.dry_run,
extensions=args.extensions,
exclude_folders=args.exclude_folders,
find_duplicates=args.find_duplicates,
show_deleted=args.show_deleted,
top=args.top,
)
print_summary(
stats,
to_optimize=args.optimize,
find_duplicates=args.find_duplicates,
show_deleted=args.show_deleted,
top=args.top
)
if __name__ == '__main__':
main()