Files
pptx-image-compress/pptx_image_compress.py

468 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)
Version: 1.1.6
Highlights:
- Caesium-Scratch außerhalb des PPTX-Arbeitsverzeichnisses -> keine Tempfiles in finaler PPTX
- Safety-Cleanup: entfernt 'caesium*' Ordner und '*.tmp' in ppt/media, bevor gezippt wird
- Overwrite Policy: -O bigger
- Log: image_name,size_before,size_after,saving,saving_percent
- Summary inkl. Zeit benötigt
Änderungen in 1.1.6:
- Libcaesium 1.3.0 kann nun auch files ignorieren, wenn die Kompression kleiner als <MIN_SAVING> ist
"""
import argparse
import os
import re
import xml.etree.ElementTree as ET
import sys
import zipfile
import tempfile
import shutil
import subprocess
import time
import fnmatch
from glob import glob
from pathlib import Path
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from typing import List, Optional
__version__ = "1.1.6"
ALLOWED_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
PROGRESS_BAR_LEN = 40
TEMP_PREFIX = "pptx_compress_"
DEFAULT_MIN_SAVINGS = "2%"
# -------------------- Utilities --------------------
def human_mb(nbytes: int) -> float:
return round(nbytes / (1024 * 1024), 2)
def human_kb(nbytes: int) -> float:
return round(nbytes / 1024,2)
def ensure_clean_file(path: Path):
if path.exists():
try:
if path.is_file():
path.unlink()
else:
shutil.rmtree(path, ignore_errors=True)
except Exception:
pass
def cleanup_old_temps():
tmp_root = Path(tempfile.gettempdir())
for p in tmp_root.glob(f"{TEMP_PREFIX}*"):
try:
if p.is_dir():
shutil.rmtree(p, ignore_errors=True)
else:
p.unlink(missing_ok=True)
except Exception:
pass
def print_progress(i: int, total: int):
if total <= 0:
return
done = int(PROGRESS_BAR_LEN * i / total)
bar = "" * done + "-" * (PROGRESS_BAR_LEN - done)
pct = int(i * 100 / total)
print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True)
def zip_dir_to_pptx(src_dir: Path, out_pptx: Path):
with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z:
for root, _, files in os.walk(src_dir):
for f in files:
full = Path(root) / f
rel = full.relative_to(src_dir)
z.write(full, arcname=str(rel))
def which(cmd: str):
return shutil.which(cmd)
def compress_with_caesium(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str) -> Path | None:
exe = which("caesiumclt")
if not exe:
raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.")
out_dir.mkdir(parents=True, exist_ok=True)
ext = original.suffix.lower()
if ext not in {".jpg", ".jpeg", ".png", ".webp", ".gif"}:
return None
cmd = [exe, "-q", str(quality), "-O", "bigger", "--min-savings", min_savings, "-o", str(out_dir)]
if caesium_threads is not None:
cmd += ["--threads", str(caesium_threads)]
cmd += [str(original)]
try:
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
sys.stderr.write(f"[caesiumclt] Fehler bei {original.name}:{r.stderr}")
return None
out_file = out_dir / original.name
return out_file if out_file.exists() else None
except Exception as ex:
sys.stderr.write(f"[caesiumclt] Ausnahme bei {original.name}: {ex}")
return None
def format_duration(seconds: float) -> str:
total_ms = int(round(seconds * 1000))
td = timedelta(milliseconds=total_ms)
base = str(td)
if "." in base:
hms, frac = base.split(".", 1)
return f"{hms}.{frac[:2]}"
return base
def build_image_slide_index(rels_dir: Path) -> dict[str, List[int]]:
if not rels_dir.exists() or not rels_dir.is_dir():
return {}
image_to_slides: dict[str, set[int]] = {}
for rels_path in rels_dir.iterdir():
rels_file = rels_path.name
if rels_file.startswith("slide") and rels_file.endswith(".xml.rels") and rels_path.is_file():
match = re.search(r"slide(\d+)\.xml\.rels$", rels_file)
if not match:
continue
slide_number = int(match.group(1))
try:
tree = ET.parse(rels_path)
root = tree.getroot()
for rel in root.findall(".//{http://schemas.openxmlformats.org/package/2006/relationships}Relationship"):
target = rel.attrib.get("Target", "")
image_name = Path(target).name
if image_name:
if image_name not in image_to_slides:
image_to_slides[image_name] = set()
image_to_slides[image_name].add(slide_number)
except (ET.ParseError, OSError):
print(f"Fehler beim Lesen von {rels_file}")
return {img: sorted(slides) for img, slides in image_to_slides.items()}
def get_slide_numbers_for_image(rels_dir: Path, image_filename: str) -> Optional[List[int]]:
image_to_slides = build_image_slide_index(rels_dir)
slides = image_to_slides.get(image_filename)
return slides if slides else None
# -------------------- Core per-deck processing --------------------
def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quality: int, min_savings: str) -> dict:
start_time = time.perf_counter()
result = {
"input": str(input_pptx),
"output": str(output_pptx),
"ok": False,
"size_before": 0,
"size_after": 0,
"elapsed_sec": 0.0,
"error": None,
"log_file": None,
}
try:
if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx":
raise ValueError("Eingabedatei existiert nicht oder ist keine .pptx")
cleanup_old_temps()
ensure_clean_file(output_pptx)
work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_"))
scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_"))
log_file = output_pptx.with_suffix(".log.csv")
ensure_clean_file(log_file)
log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"]
size_before = input_pptx.stat().st_size
result["size_before"] = size_before
with zipfile.ZipFile(input_pptx, "r") as z:
z.extractall(work_dir)
slides_dir = work_dir / "ppt" / "slides"
rels_dir = slides_dir / "_rels"
media_dir = work_dir / "ppt" / "media"
images = []
if media_dir.exists():
for f in sorted(media_dir.iterdir()):
if f.is_file() and f.suffix.lower() in ALLOWED_EXT:
images.append(f)
total = len(images)
print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden")
print_progress(0, total)
if not which("caesiumclt"):
raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
caesium_threads = 1 if threads and threads > 1 else None
lock = Lock()
done_count = 0
image_to_slides = build_image_slide_index(rels_dir)
def worker(idx: int, img_path: Path):
nonlocal done_count
orig_size = img_path.stat().st_size
chosen_size = orig_size
found_in_slide = None
slide_nr = ""
try:
found_in_slide = image_to_slides.get(img_path.name)
if found_in_slide is None:
slide_nr = "NOT_USED"
else:
slide_nr = str(found_in_slide)
out_sub = scratch_dir / f"img_{idx:06d}"
caesium_out = compress_with_caesium(img_path, out_sub, caesium_threads, quality, min_savings)
if caesium_out and caesium_out.exists():
s = caesium_out.stat().st_size
if s < orig_size:
tmp_target = img_path.with_suffix(img_path.suffix + ".tmp")
shutil.copy2(caesium_out, tmp_target)
tmp_target.replace(img_path)
chosen_size = s
except Exception:
chosen_size = orig_size
finally:
saving = orig_size - chosen_size
saving_percent = round((saving / orig_size) * 100, 2) if orig_size > 0 else 0.0
with lock:
log_lines.append(f"{img_path.name};{human_kb(orig_size)};{human_kb(chosen_size)};{human_kb(saving)};{saving_percent};{slide_nr}\n")
done_count += 1
print_progress(done_count, total)
if total > 0:
with ThreadPoolExecutor(max_workers=max(1, threads)) as ex:
futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)]
for _ in as_completed(futures):
pass
print() # newline
# Safety cleanup inside work_dir
for p in work_dir.rglob("*"):
try:
if p.is_dir() and p.name.lower().startswith("caesium"):
shutil.rmtree(p, ignore_errors=True)
except Exception:
pass
if media_dir.exists():
for f in media_dir.iterdir():
if f.is_file() and f.suffix.lower() == ".tmp":
try:
f.unlink(missing_ok=True)
except Exception:
pass
zip_dir_to_pptx(work_dir, output_pptx)
size_after = output_pptx.stat().st_size
result["size_after"] = size_after
try:
with open(log_file, "w", encoding="utf-8") as f:
f.writelines(log_lines)
except Exception:
pass
elapsed = time.perf_counter() - start_time
result["elapsed_sec"] = elapsed
result["log_file"] = str(log_file)
result["ok"] = True
savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2)
print(f"[OK] Fertig! ({input_pptx.name})")
print("Zusammenfassung ----------------")
print(" Vorher: ", human_mb(size_before), "MB")
print(" Nachher: ", human_mb(size_after), "MB")
print(" Ersparnis: ", f"{savings_pct}%")
print(" Zeit: ", format_duration(elapsed))
print(" Log: ", log_file)
except Exception as e:
result["error"] = str(e)
finally:
try:
shutil.rmtree(work_dir, ignore_errors=True) # type: ignore[name-defined]
except Exception:
pass
try:
shutil.rmtree(scratch_dir, ignore_errors=True) # type: ignore[name-defined]
except Exception:
pass
cleanup_old_temps()
return result
# -------------------- Input helpers --------------------
def expand_inputs(inputs: list[str]) -> list[Path]:
files: list[Path] = []
for inp in inputs:
p = Path(inp)
if any(ch in inp for ch in ['*', '?']):
for g in glob(inp):
if g.lower().endswith('.pptx'):
files.append(Path(g).resolve())
else:
if p.is_dir():
for g in p.glob('*.pptx'):
files.append(g.resolve())
else:
if p.suffix.lower() == '.pptx':
files.append(p.resolve())
seen = set()
uniq = []
for f in files:
if str(f) not in seen:
uniq.append(f)
seen.add(str(f))
return uniq
def collect_from_dir(input_dir: Path, pattern: str, recursive: bool) -> list[Path]:
files: list[Path] = []
if recursive:
for root, _, names in os.walk(input_dir):
for n in names:
if fnmatch.fnmatch(n, pattern):
p = Path(root) / n
if p.suffix.lower() == '.pptx':
files.append(p.resolve())
else:
for p in input_dir.glob(pattern):
if p.suffix.lower() == '.pptx':
files.append(p.resolve())
seen = set()
out = []
for f in files:
s = str(f)
if s not in seen:
out.append(f)
seen.add(s)
return out
# -------------------- CLI --------------------
def main():
parser = argparse.ArgumentParser(
description="PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('-i','--input', nargs='*', help='Input-PPTX (eine oder mehrere, Wildcards erlaubt). Bei mehreren: -O erforderlich.')
parser.add_argument('--input-dir', help='Eingabe-Verzeichnis (optional, für Batch)')
parser.add_argument('-o','--output', help='Output-PPTX (nur Single-Mode)')
parser.add_argument('-O','--output-dir', help='Output-Verzeichnis (erforderlich für Batch)')
parser.add_argument('--pattern', default='*.pptx', help='Dateimuster für --input-dir')
parser.add_argument('--recursive', action='store_true', help='Rekursiv in --input-dir suchen')
#parser.add_argument('-t','--threads', type=int, default=min(32, os.cpu_count() or 4), help='Anzahl paralleler Threads pro Datei')
parser.add_argument('-t','--threads', type=int, default=16, help='Anzahl paralleler Threads pro Datei')
parser.add_argument('-q','--quality', type=int, default=90, help='Qualität für caesiumclt (0..100), höher = bessere Qualität / größere Datei')
parser.add_argument('--min-savings', default=DEFAULT_MIN_SAVINGS, help="Mindestersparnis für caesiumclt (z. B. 2%%, 100KB, 1MB oder Bytes als Zahl)")
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}', help="Zeigt die Versionsnummer an" )
args = parser.parse_args()
print("Threads used: ", args.threads," Threads")
if args.quality < 0 or args.quality > 100:
print('[ERROR] Ungültige Qualität. Erlaubt: 0..100')
sys.exit(1)
input_files: list[Path] = []
if args.input:
input_files.extend(expand_inputs(args.input))
if args.input_dir:
input_files.extend(collect_from_dir(Path(args.input_dir), args.pattern, args.recursive))
if len(input_files) == 0:
parser.print_help()
sys.exit(1)
batch_mode = len(input_files) > 1
if batch_mode and not args.output_dir:
print('[ERROR] Batch-Modus erkannt. Bitte -O/--output-dir angeben.')
sys.exit(2)
if not which('caesiumclt'):
print("[ERROR] 'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
sys.exit(3)
overall_before = 0
overall_after = 0
successes = 0
failures = 0
if batch_mode:
out_dir = Path(args.output_dir).resolve()
out_dir.mkdir(parents=True, exist_ok=True)
print(f"Batch: {len(input_files)} Datei(en). Output-Verzeichnis: {out_dir}")
for src in input_files:
if not src.exists():
print(f"- Übersprungen (nicht gefunden): {src}")
failures += 1
continue
dst = out_dir / f"{src.stem}_compressed.pptx"
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
if res['ok']:
successes += 1
overall_before += res['size_before']
overall_after += res['size_after']
else:
failures += 1
print(f" Fehler: {src.name} -> {res['error']}")
else:
src = input_files[0]
if args.output_dir:
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
dst = Path(args.output_dir) / f"{src.stem}_compressed.pptx"
else:
dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx")
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
if res['ok']:
successes += 1
overall_before += res['size_before']
overall_after += res['size_after']
else:
failures += 1
print(f" Fehler: {src.name} -> {res['error']}")
if batch_mode:
print(f"====== Gesamt-Summary ======")
print(f"[SUCCESS] Dateien erfolgreich: {successes}")
if failures > 0:
print(f"[FAILED] Dateien fehlgeschlagen: {failures}")
if overall_before > 0:
pct = round(100.0 * (overall_before - overall_after) / overall_before, 2)
else:
pct = 0.0
print(f"Gesamtgröße vorher: {human_mb(overall_before)} MB")
print(f"Gesamtgröße nachher: {human_mb(overall_after)} MB")
print(f"Gesamt-Ersparnis: {pct}%")
if __name__ == '__main__':
main()