#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup) Version: 1.1.6 Highlights: - Caesium-Scratch außerhalb des PPTX-Arbeitsverzeichnisses -> keine Tempfiles in finaler PPTX - Safety-Cleanup: entfernt 'caesium*' Ordner und '*.tmp' in ppt/media, bevor gezippt wird - Overwrite Policy: -O bigger - Log: image_name,size_before,size_after,saving,saving_percent - Summary inkl. Zeit benötigt Änderungen in 1.1.6: - Libcaesium 1.3.0 kann nun auch files ignorieren, wenn die Kompression kleiner als ist """ import argparse import os import re import xml.etree.ElementTree as ET import sys import zipfile import tempfile import shutil import subprocess import time import fnmatch from glob import glob from pathlib import Path from datetime import timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock from dataclasses import dataclass from typing import Callable, List, Optional __version__ = "1.1.6" ALLOWED_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif"} PROGRESS_BAR_LEN = 40 TEMP_PREFIX = "pptx_compress_" DEFAULT_MIN_SAVINGS = "2%" @dataclass class DeckResult: input: str output: str ok: bool = False size_before: int = 0 size_after: int = 0 elapsed_sec: float = 0.0 error: Optional[str] = None log_file: Optional[str] = None @dataclass class ImageProcessResult: image_name: str orig_size: int chosen_size: int slide_nr: str def discover_images(media_dir: Path) -> list[Path]: images: list[Path] = [] if media_dir.exists(): for f in sorted(media_dir.iterdir()): if f.is_file() and f.suffix.lower() in ALLOWED_EXT: images.append(f) return images def image_result_to_log_line(image_result: ImageProcessResult) -> str: saving = image_result.orig_size - image_result.chosen_size saving_percent = round((saving / image_result.orig_size) * 100, 2) if image_result.orig_size > 0 else 0.0 return f"{image_result.image_name};{human_kb(image_result.orig_size)};{human_kb(image_result.chosen_size)};{human_kb(saving)};{saving_percent};{image_result.slide_nr}\n" # -------------------- Utilities -------------------- def human_mb(nbytes: int) -> float: return round(nbytes / (1024 * 1024), 2) def human_kb(nbytes: int) -> float: return round(nbytes / 1024,2) def ensure_clean_file(path: Path): if path.exists(): try: if path.is_file(): path.unlink() else: shutil.rmtree(path, ignore_errors=True) except Exception: pass def cleanup_old_temps(): tmp_root = Path(tempfile.gettempdir()) for p in tmp_root.glob(f"{TEMP_PREFIX}*"): try: if p.is_dir(): shutil.rmtree(p, ignore_errors=True) else: p.unlink(missing_ok=True) except Exception: pass def print_progress(i: int, total: int): if total <= 0: return done = int(PROGRESS_BAR_LEN * i / total) bar = "█" * done + "-" * (PROGRESS_BAR_LEN - done) pct = int(i * 100 / total) print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True) def zip_dir_to_pptx(src_dir: Path, out_pptx: Path): with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z: for root, _, files in os.walk(src_dir): for f in files: full = Path(root) / f rel = full.relative_to(src_dir) z.write(full, arcname=str(rel)) def which(cmd: str): return shutil.which(cmd) def compress_with_caesium(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str) -> Path | None: exe = which("caesiumclt") if not exe: raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.") out_dir.mkdir(parents=True, exist_ok=True) ext = original.suffix.lower() if ext not in {".jpg", ".jpeg", ".png", ".webp", ".gif"}: return None cmd = [exe, "-q", str(quality), "-O", "bigger", "--min-savings", min_savings, "-o", str(out_dir)] if caesium_threads is not None: cmd += ["--threads", str(caesium_threads)] cmd += [str(original)] try: r = subprocess.run(cmd, capture_output=True, text=True) if r.returncode != 0: sys.stderr.write(f"[caesiumclt] Fehler bei {original.name}:{r.stderr}") return None out_file = out_dir / original.name return out_file if out_file.exists() else None except Exception as ex: sys.stderr.write(f"[caesiumclt] Ausnahme bei {original.name}: {ex}") return None def format_duration(seconds: float) -> str: total_ms = int(round(seconds * 1000)) td = timedelta(milliseconds=total_ms) base = str(td) if "." in base: hms, frac = base.split(".", 1) return f"{hms}.{frac[:2]}" return base def build_image_slide_index(rels_dir: Path) -> dict[str, List[int]]: if not rels_dir.exists() or not rels_dir.is_dir(): return {} image_to_slides: dict[str, set[int]] = {} for rels_path in rels_dir.iterdir(): rels_file = rels_path.name if rels_file.startswith("slide") and rels_file.endswith(".xml.rels") and rels_path.is_file(): match = re.search(r"slide(\d+)\.xml\.rels$", rels_file) if not match: continue slide_number = int(match.group(1)) try: tree = ET.parse(rels_path) root = tree.getroot() for rel in root.findall(".//{http://schemas.openxmlformats.org/package/2006/relationships}Relationship"): target = rel.attrib.get("Target", "") image_name = Path(target).name if image_name: if image_name not in image_to_slides: image_to_slides[image_name] = set() image_to_slides[image_name].add(slide_number) except (ET.ParseError, OSError): print(f"Fehler beim Lesen von {rels_file}") return {img: sorted(slides) for img, slides in image_to_slides.items()} def get_slide_numbers_for_image(rels_dir: Path, image_filename: str) -> Optional[List[int]]: image_to_slides = build_image_slide_index(rels_dir) slides = image_to_slides.get(image_filename) return slides if slides else None def process_image_file( idx: int, img_path: Path, scratch_dir: Path, image_to_slides: dict[str, List[int]], caesium_threads: int | None, quality: int, min_savings: str, compressor: Callable[[Path, Path, int | None, int, str], Path | None], ) -> ImageProcessResult: orig_size = img_path.stat().st_size chosen_size = orig_size found_in_slide = image_to_slides.get(img_path.name) slide_nr = "NOT_USED" if found_in_slide is None else str(found_in_slide) try: out_sub = scratch_dir / f"img_{idx:06d}" caesium_out = compressor(img_path, out_sub, caesium_threads, quality, min_savings) if caesium_out and caesium_out.exists(): s = caesium_out.stat().st_size if s < orig_size: tmp_target = img_path.with_suffix(img_path.suffix + ".tmp") shutil.copy2(caesium_out, tmp_target) tmp_target.replace(img_path) chosen_size = s except Exception: chosen_size = orig_size return ImageProcessResult( image_name=img_path.name, orig_size=orig_size, chosen_size=chosen_size, slide_nr=slide_nr, ) # -------------------- Core per-deck processing -------------------- def process_single_deck( input_pptx: Path, output_pptx: Path, threads: int, quality: int, min_savings: str, compressor: Callable[[Path, Path, int | None, int, str], Path | None] = compress_with_caesium, ) -> DeckResult: start_time = time.perf_counter() result = DeckResult( input=str(input_pptx), output=str(output_pptx), ) try: if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx": raise ValueError("Eingabedatei existiert nicht oder ist keine .pptx") cleanup_old_temps() ensure_clean_file(output_pptx) work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_")) scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_")) log_file = output_pptx.with_suffix(".log.csv") ensure_clean_file(log_file) log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"] size_before = input_pptx.stat().st_size result.size_before = size_before with zipfile.ZipFile(input_pptx, "r") as z: z.extractall(work_dir) slides_dir = work_dir / "ppt" / "slides" rels_dir = slides_dir / "_rels" media_dir = work_dir / "ppt" / "media" images = discover_images(media_dir) total = len(images) print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden") print_progress(0, total) if not which("caesiumclt") and compressor is compress_with_caesium: raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.") caesium_threads = 1 if threads and threads > 1 else None lock = Lock() done_count = 0 image_to_slides = build_image_slide_index(rels_dir) def worker(idx: int, img_path: Path): nonlocal done_count image_result = process_image_file( idx=idx, img_path=img_path, scratch_dir=scratch_dir, image_to_slides=image_to_slides, caesium_threads=caesium_threads, quality=quality, min_savings=min_savings, compressor=compressor, ) with lock: log_lines.append(image_result_to_log_line(image_result)) done_count += 1 print_progress(done_count, total) if total > 0: with ThreadPoolExecutor(max_workers=max(1, threads)) as ex: futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)] for _ in as_completed(futures): pass print() # newline # Safety cleanup inside work_dir for p in work_dir.rglob("*"): try: if p.is_dir() and p.name.lower().startswith("caesium"): shutil.rmtree(p, ignore_errors=True) except Exception: pass if media_dir.exists(): for f in media_dir.iterdir(): if f.is_file() and f.suffix.lower() == ".tmp": try: f.unlink(missing_ok=True) except Exception: pass zip_dir_to_pptx(work_dir, output_pptx) size_after = output_pptx.stat().st_size result.size_after = size_after try: with open(log_file, "w", encoding="utf-8") as f: f.writelines(log_lines) except Exception: pass elapsed = time.perf_counter() - start_time result.elapsed_sec = elapsed result.log_file = str(log_file) result.ok = True savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2) print(f"[OK] Fertig! ({input_pptx.name})") print("Zusammenfassung ----------------") print(" Vorher: ", human_mb(size_before), "MB") print(" Nachher: ", human_mb(size_after), "MB") print(" Ersparnis: ", f"{savings_pct}%") print(" Zeit: ", format_duration(elapsed)) print(" Log: ", log_file) except Exception as e: result.error = str(e) finally: try: shutil.rmtree(work_dir, ignore_errors=True) # type: ignore[name-defined] except Exception: pass try: shutil.rmtree(scratch_dir, ignore_errors=True) # type: ignore[name-defined] except Exception: pass cleanup_old_temps() return result # -------------------- Input helpers -------------------- def expand_inputs(inputs: list[str]) -> list[Path]: files: list[Path] = [] for inp in inputs: p = Path(inp) if any(ch in inp for ch in ['*', '?']): for g in glob(inp): if g.lower().endswith('.pptx'): files.append(Path(g).resolve()) else: if p.is_dir(): for g in p.glob('*.pptx'): files.append(g.resolve()) else: if p.suffix.lower() == '.pptx': files.append(p.resolve()) seen = set() uniq = [] for f in files: if str(f) not in seen: uniq.append(f) seen.add(str(f)) return uniq def collect_from_dir(input_dir: Path, pattern: str, recursive: bool) -> list[Path]: files: list[Path] = [] if recursive: for root, _, names in os.walk(input_dir): for n in names: if fnmatch.fnmatch(n, pattern): p = Path(root) / n if p.suffix.lower() == '.pptx': files.append(p.resolve()) else: for p in input_dir.glob(pattern): if p.suffix.lower() == '.pptx': files.append(p.resolve()) seen = set() out = [] for f in files: s = str(f) if s not in seen: out.append(f) seen.add(s) return out # -------------------- CLI -------------------- def main(): parser = argparse.ArgumentParser( description="PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument('-i','--input', nargs='*', help='Input-PPTX (eine oder mehrere, Wildcards erlaubt). Bei mehreren: -O erforderlich.') parser.add_argument('--input-dir', help='Eingabe-Verzeichnis (optional, für Batch)') parser.add_argument('-o','--output', help='Output-PPTX (nur Single-Mode)') parser.add_argument('-O','--output-dir', help='Output-Verzeichnis (erforderlich für Batch)') parser.add_argument('--pattern', default='*.pptx', help='Dateimuster für --input-dir') parser.add_argument('--recursive', action='store_true', help='Rekursiv in --input-dir suchen') #parser.add_argument('-t','--threads', type=int, default=min(32, os.cpu_count() or 4), help='Anzahl paralleler Threads pro Datei') parser.add_argument('-t','--threads', type=int, default=16, help='Anzahl paralleler Threads pro Datei') parser.add_argument('-q','--quality', type=int, default=90, help='Qualität für caesiumclt (0..100), höher = bessere Qualität / größere Datei') parser.add_argument('--min-savings', default=DEFAULT_MIN_SAVINGS, help="Mindestersparnis für caesiumclt (z. B. 2%%, 100KB, 1MB oder Bytes als Zahl)") parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}', help="Zeigt die Versionsnummer an" ) args = parser.parse_args() print("Threads used: ", args.threads," Threads") if args.quality < 0 or args.quality > 100: print('[ERROR] Ungültige Qualität. Erlaubt: 0..100') sys.exit(1) input_files: list[Path] = [] if args.input: input_files.extend(expand_inputs(args.input)) if args.input_dir: input_files.extend(collect_from_dir(Path(args.input_dir), args.pattern, args.recursive)) if len(input_files) == 0: parser.print_help() sys.exit(1) batch_mode = len(input_files) > 1 if batch_mode and not args.output_dir: print('[ERROR] Batch-Modus erkannt. Bitte -O/--output-dir angeben.') sys.exit(2) if not which('caesiumclt'): print("[ERROR] 'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.") sys.exit(3) overall_before = 0 overall_after = 0 successes = 0 failures = 0 if batch_mode: out_dir = Path(args.output_dir).resolve() out_dir.mkdir(parents=True, exist_ok=True) print(f"Batch: {len(input_files)} Datei(en). Output-Verzeichnis: {out_dir}") for src in input_files: if not src.exists(): print(f"- Übersprungen (nicht gefunden): {src}") failures += 1 continue dst = out_dir / f"{src.stem}_compressed.pptx" res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings) if res.ok: successes += 1 overall_before += res.size_before overall_after += res.size_after else: failures += 1 print(f" Fehler: {src.name} -> {res.error}") else: src = input_files[0] if args.output_dir: Path(args.output_dir).mkdir(parents=True, exist_ok=True) dst = Path(args.output_dir) / f"{src.stem}_compressed.pptx" else: dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx") res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings) if res.ok: successes += 1 overall_before += res.size_before overall_after += res.size_after else: failures += 1 print(f" Fehler: {src.name} -> {res.error}") if batch_mode: print(f"====== Gesamt-Summary ======") print(f"[SUCCESS] Dateien erfolgreich: {successes}") if failures > 0: print(f"[FAILED] Dateien fehlgeschlagen: {failures}") if overall_before > 0: pct = round(100.0 * (overall_before - overall_after) / overall_before, 2) else: pct = 0.0 print(f"Gesamtgröße vorher: {human_mb(overall_before)} MB") print(f"Gesamtgröße nachher: {human_mb(overall_after)} MB") print(f"Gesamt-Ersparnis: {pct}%") if __name__ == '__main__': main()