519 lines
18 KiB
Python
519 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)
|
|
Version: 1.1.6
|
|
|
|
|
|
Highlights:
|
|
- Caesium-Scratch außerhalb des PPTX-Arbeitsverzeichnisses -> keine Tempfiles in finaler PPTX
|
|
- Safety-Cleanup: entfernt 'caesium*' Ordner und '*.tmp' in ppt/media, bevor gezippt wird
|
|
- Overwrite Policy: -O bigger
|
|
- Log: image_name,size_before,size_after,saving,saving_percent
|
|
- Summary inkl. Zeit benötigt
|
|
|
|
Änderungen in 1.1.6:
|
|
- Libcaesium 1.3.0 kann nun auch files ignorieren, wenn die Kompression kleiner als <MIN_SAVING> ist
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
import sys
|
|
import zipfile
|
|
import tempfile
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
import fnmatch
|
|
from glob import glob
|
|
from pathlib import Path
|
|
from datetime import timedelta
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from threading import Lock
|
|
from dataclasses import dataclass
|
|
from typing import Callable, List, Optional
|
|
|
|
|
|
__version__ = "1.1.6"
|
|
|
|
ALLOWED_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
|
|
PROGRESS_BAR_LEN = 40
|
|
TEMP_PREFIX = "pptx_compress_"
|
|
DEFAULT_MIN_SAVINGS = "2%"
|
|
|
|
|
|
@dataclass
|
|
class DeckResult:
|
|
input: str
|
|
output: str
|
|
ok: bool = False
|
|
size_before: int = 0
|
|
size_after: int = 0
|
|
elapsed_sec: float = 0.0
|
|
error: Optional[str] = None
|
|
log_file: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class ImageProcessResult:
|
|
image_name: str
|
|
orig_size: int
|
|
chosen_size: int
|
|
slide_nr: str
|
|
|
|
|
|
def discover_images(media_dir: Path) -> list[Path]:
|
|
images: list[Path] = []
|
|
if media_dir.exists():
|
|
for f in sorted(media_dir.iterdir()):
|
|
if f.is_file() and f.suffix.lower() in ALLOWED_EXT:
|
|
images.append(f)
|
|
return images
|
|
|
|
|
|
def image_result_to_log_line(image_result: ImageProcessResult) -> str:
|
|
saving = image_result.orig_size - image_result.chosen_size
|
|
saving_percent = round((saving / image_result.orig_size) * 100, 2) if image_result.orig_size > 0 else 0.0
|
|
return f"{image_result.image_name};{human_kb(image_result.orig_size)};{human_kb(image_result.chosen_size)};{human_kb(saving)};{saving_percent};{image_result.slide_nr}\n"
|
|
|
|
|
|
# -------------------- Utilities --------------------
|
|
def human_mb(nbytes: int) -> float:
|
|
return round(nbytes / (1024 * 1024), 2)
|
|
|
|
def human_kb(nbytes: int) -> float:
|
|
return round(nbytes / 1024,2)
|
|
|
|
def ensure_clean_file(path: Path):
|
|
if path.exists():
|
|
try:
|
|
if path.is_file():
|
|
path.unlink()
|
|
else:
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
except Exception:
|
|
pass
|
|
|
|
def cleanup_old_temps():
|
|
tmp_root = Path(tempfile.gettempdir())
|
|
for p in tmp_root.glob(f"{TEMP_PREFIX}*"):
|
|
try:
|
|
if p.is_dir():
|
|
shutil.rmtree(p, ignore_errors=True)
|
|
else:
|
|
p.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
def print_progress(i: int, total: int):
|
|
if total <= 0:
|
|
return
|
|
done = int(PROGRESS_BAR_LEN * i / total)
|
|
bar = "█" * done + "-" * (PROGRESS_BAR_LEN - done)
|
|
pct = int(i * 100 / total)
|
|
print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True)
|
|
|
|
def zip_dir_to_pptx(src_dir: Path, out_pptx: Path):
|
|
with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z:
|
|
for root, _, files in os.walk(src_dir):
|
|
for f in files:
|
|
full = Path(root) / f
|
|
rel = full.relative_to(src_dir)
|
|
z.write(full, arcname=str(rel))
|
|
|
|
def which(cmd: str):
|
|
return shutil.which(cmd)
|
|
|
|
def compress_with_caesium(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str) -> Path | None:
|
|
exe = which("caesiumclt")
|
|
if not exe:
|
|
raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.")
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
ext = original.suffix.lower()
|
|
if ext not in {".jpg", ".jpeg", ".png", ".webp", ".gif"}:
|
|
return None
|
|
cmd = [exe, "-q", str(quality), "-O", "bigger", "--min-savings", min_savings, "-o", str(out_dir)]
|
|
if caesium_threads is not None:
|
|
cmd += ["--threads", str(caesium_threads)]
|
|
cmd += [str(original)]
|
|
try:
|
|
r = subprocess.run(cmd, capture_output=True, text=True)
|
|
if r.returncode != 0:
|
|
sys.stderr.write(f"[caesiumclt] Fehler bei {original.name}:{r.stderr}")
|
|
return None
|
|
out_file = out_dir / original.name
|
|
return out_file if out_file.exists() else None
|
|
except Exception as ex:
|
|
sys.stderr.write(f"[caesiumclt] Ausnahme bei {original.name}: {ex}")
|
|
return None
|
|
|
|
def format_duration(seconds: float) -> str:
|
|
total_ms = int(round(seconds * 1000))
|
|
td = timedelta(milliseconds=total_ms)
|
|
base = str(td)
|
|
if "." in base:
|
|
hms, frac = base.split(".", 1)
|
|
return f"{hms}.{frac[:2]}"
|
|
return base
|
|
|
|
def build_image_slide_index(rels_dir: Path) -> dict[str, List[int]]:
|
|
if not rels_dir.exists() or not rels_dir.is_dir():
|
|
return {}
|
|
|
|
image_to_slides: dict[str, set[int]] = {}
|
|
|
|
for rels_path in rels_dir.iterdir():
|
|
rels_file = rels_path.name
|
|
if rels_file.startswith("slide") and rels_file.endswith(".xml.rels") and rels_path.is_file():
|
|
match = re.search(r"slide(\d+)\.xml\.rels$", rels_file)
|
|
if not match:
|
|
continue
|
|
slide_number = int(match.group(1))
|
|
try:
|
|
tree = ET.parse(rels_path)
|
|
root = tree.getroot()
|
|
for rel in root.findall(".//{http://schemas.openxmlformats.org/package/2006/relationships}Relationship"):
|
|
target = rel.attrib.get("Target", "")
|
|
image_name = Path(target).name
|
|
if image_name:
|
|
if image_name not in image_to_slides:
|
|
image_to_slides[image_name] = set()
|
|
image_to_slides[image_name].add(slide_number)
|
|
except (ET.ParseError, OSError):
|
|
print(f"Fehler beim Lesen von {rels_file}")
|
|
|
|
return {img: sorted(slides) for img, slides in image_to_slides.items()}
|
|
|
|
|
|
def get_slide_numbers_for_image(rels_dir: Path, image_filename: str) -> Optional[List[int]]:
|
|
image_to_slides = build_image_slide_index(rels_dir)
|
|
slides = image_to_slides.get(image_filename)
|
|
return slides if slides else None
|
|
|
|
|
|
def process_image_file(
|
|
idx: int,
|
|
img_path: Path,
|
|
scratch_dir: Path,
|
|
image_to_slides: dict[str, List[int]],
|
|
caesium_threads: int | None,
|
|
quality: int,
|
|
min_savings: str,
|
|
compressor: Callable[[Path, Path, int | None, int, str], Path | None],
|
|
) -> ImageProcessResult:
|
|
orig_size = img_path.stat().st_size
|
|
chosen_size = orig_size
|
|
found_in_slide = image_to_slides.get(img_path.name)
|
|
slide_nr = "NOT_USED" if found_in_slide is None else str(found_in_slide)
|
|
|
|
try:
|
|
out_sub = scratch_dir / f"img_{idx:06d}"
|
|
caesium_out = compressor(img_path, out_sub, caesium_threads, quality, min_savings)
|
|
if caesium_out and caesium_out.exists():
|
|
s = caesium_out.stat().st_size
|
|
if s < orig_size:
|
|
tmp_target = img_path.with_suffix(img_path.suffix + ".tmp")
|
|
shutil.copy2(caesium_out, tmp_target)
|
|
tmp_target.replace(img_path)
|
|
chosen_size = s
|
|
except Exception:
|
|
chosen_size = orig_size
|
|
|
|
return ImageProcessResult(
|
|
image_name=img_path.name,
|
|
orig_size=orig_size,
|
|
chosen_size=chosen_size,
|
|
slide_nr=slide_nr,
|
|
)
|
|
|
|
|
|
# -------------------- Core per-deck processing --------------------
|
|
def process_single_deck(
|
|
input_pptx: Path,
|
|
output_pptx: Path,
|
|
threads: int,
|
|
quality: int,
|
|
min_savings: str,
|
|
compressor: Callable[[Path, Path, int | None, int, str], Path | None] = compress_with_caesium,
|
|
) -> DeckResult:
|
|
start_time = time.perf_counter()
|
|
result = DeckResult(
|
|
input=str(input_pptx),
|
|
output=str(output_pptx),
|
|
)
|
|
|
|
try:
|
|
if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx":
|
|
raise ValueError("Eingabedatei existiert nicht oder ist keine .pptx")
|
|
|
|
cleanup_old_temps()
|
|
ensure_clean_file(output_pptx)
|
|
|
|
work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_"))
|
|
scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_"))
|
|
|
|
log_file = output_pptx.with_suffix(".log.csv")
|
|
ensure_clean_file(log_file)
|
|
log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"]
|
|
|
|
size_before = input_pptx.stat().st_size
|
|
result.size_before = size_before
|
|
|
|
with zipfile.ZipFile(input_pptx, "r") as z:
|
|
z.extractall(work_dir)
|
|
|
|
slides_dir = work_dir / "ppt" / "slides"
|
|
rels_dir = slides_dir / "_rels"
|
|
media_dir = work_dir / "ppt" / "media"
|
|
|
|
images = discover_images(media_dir)
|
|
|
|
total = len(images)
|
|
print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden")
|
|
print_progress(0, total)
|
|
|
|
if not which("caesiumclt") and compressor is compress_with_caesium:
|
|
raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
|
|
|
|
caesium_threads = 1 if threads and threads > 1 else None
|
|
lock = Lock()
|
|
done_count = 0
|
|
image_to_slides = build_image_slide_index(rels_dir)
|
|
|
|
def worker(idx: int, img_path: Path):
|
|
nonlocal done_count
|
|
image_result = process_image_file(
|
|
idx=idx,
|
|
img_path=img_path,
|
|
scratch_dir=scratch_dir,
|
|
image_to_slides=image_to_slides,
|
|
caesium_threads=caesium_threads,
|
|
quality=quality,
|
|
min_savings=min_savings,
|
|
compressor=compressor,
|
|
)
|
|
|
|
with lock:
|
|
log_lines.append(image_result_to_log_line(image_result))
|
|
done_count += 1
|
|
print_progress(done_count, total)
|
|
|
|
if total > 0:
|
|
with ThreadPoolExecutor(max_workers=max(1, threads)) as ex:
|
|
futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)]
|
|
for _ in as_completed(futures):
|
|
pass
|
|
|
|
print() # newline
|
|
|
|
# Safety cleanup inside work_dir
|
|
for p in work_dir.rglob("*"):
|
|
try:
|
|
if p.is_dir() and p.name.lower().startswith("caesium"):
|
|
shutil.rmtree(p, ignore_errors=True)
|
|
except Exception:
|
|
pass
|
|
|
|
if media_dir.exists():
|
|
for f in media_dir.iterdir():
|
|
if f.is_file() and f.suffix.lower() == ".tmp":
|
|
try:
|
|
f.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
zip_dir_to_pptx(work_dir, output_pptx)
|
|
size_after = output_pptx.stat().st_size
|
|
result.size_after = size_after
|
|
|
|
try:
|
|
with open(log_file, "w", encoding="utf-8") as f:
|
|
f.writelines(log_lines)
|
|
except Exception:
|
|
pass
|
|
|
|
elapsed = time.perf_counter() - start_time
|
|
result.elapsed_sec = elapsed
|
|
result.log_file = str(log_file)
|
|
result.ok = True
|
|
|
|
savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2)
|
|
print(f"[OK] Fertig! ({input_pptx.name})")
|
|
print("Zusammenfassung ----------------")
|
|
print(" Vorher: ", human_mb(size_before), "MB")
|
|
print(" Nachher: ", human_mb(size_after), "MB")
|
|
print(" Ersparnis: ", f"{savings_pct}%")
|
|
print(" Zeit: ", format_duration(elapsed))
|
|
print(" Log: ", log_file)
|
|
|
|
except Exception as e:
|
|
result.error = str(e)
|
|
finally:
|
|
try:
|
|
shutil.rmtree(work_dir, ignore_errors=True) # type: ignore[name-defined]
|
|
except Exception:
|
|
pass
|
|
try:
|
|
shutil.rmtree(scratch_dir, ignore_errors=True) # type: ignore[name-defined]
|
|
except Exception:
|
|
pass
|
|
cleanup_old_temps()
|
|
|
|
return result
|
|
|
|
# -------------------- Input helpers --------------------
|
|
def expand_inputs(inputs: list[str]) -> list[Path]:
|
|
files: list[Path] = []
|
|
for inp in inputs:
|
|
p = Path(inp)
|
|
if any(ch in inp for ch in ['*', '?']):
|
|
for g in glob(inp):
|
|
if g.lower().endswith('.pptx'):
|
|
files.append(Path(g).resolve())
|
|
else:
|
|
if p.is_dir():
|
|
for g in p.glob('*.pptx'):
|
|
files.append(g.resolve())
|
|
else:
|
|
if p.suffix.lower() == '.pptx':
|
|
files.append(p.resolve())
|
|
seen = set()
|
|
uniq = []
|
|
for f in files:
|
|
if str(f) not in seen:
|
|
uniq.append(f)
|
|
seen.add(str(f))
|
|
return uniq
|
|
|
|
|
|
def collect_from_dir(input_dir: Path, pattern: str, recursive: bool) -> list[Path]:
|
|
files: list[Path] = []
|
|
if recursive:
|
|
for root, _, names in os.walk(input_dir):
|
|
for n in names:
|
|
if fnmatch.fnmatch(n, pattern):
|
|
p = Path(root) / n
|
|
if p.suffix.lower() == '.pptx':
|
|
files.append(p.resolve())
|
|
else:
|
|
for p in input_dir.glob(pattern):
|
|
if p.suffix.lower() == '.pptx':
|
|
files.append(p.resolve())
|
|
seen = set()
|
|
out = []
|
|
for f in files:
|
|
s = str(f)
|
|
if s not in seen:
|
|
out.append(f)
|
|
seen.add(s)
|
|
return out
|
|
|
|
# -------------------- CLI --------------------
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
parser.add_argument('-i','--input', nargs='*', help='Input-PPTX (eine oder mehrere, Wildcards erlaubt). Bei mehreren: -O erforderlich.')
|
|
parser.add_argument('--input-dir', help='Eingabe-Verzeichnis (optional, für Batch)')
|
|
parser.add_argument('-o','--output', help='Output-PPTX (nur Single-Mode)')
|
|
parser.add_argument('-O','--output-dir', help='Output-Verzeichnis (erforderlich für Batch)')
|
|
parser.add_argument('--pattern', default='*.pptx', help='Dateimuster für --input-dir')
|
|
parser.add_argument('--recursive', action='store_true', help='Rekursiv in --input-dir suchen')
|
|
#parser.add_argument('-t','--threads', type=int, default=min(32, os.cpu_count() or 4), help='Anzahl paralleler Threads pro Datei')
|
|
parser.add_argument('-t','--threads', type=int, default=16, help='Anzahl paralleler Threads pro Datei')
|
|
parser.add_argument('-q','--quality', type=int, default=90, help='Qualität für caesiumclt (0..100), höher = bessere Qualität / größere Datei')
|
|
parser.add_argument('--min-savings', default=DEFAULT_MIN_SAVINGS, help="Mindestersparnis für caesiumclt (z. B. 2%%, 100KB, 1MB oder Bytes als Zahl)")
|
|
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}', help="Zeigt die Versionsnummer an" )
|
|
|
|
args = parser.parse_args()
|
|
|
|
print("Threads used: ", args.threads," Threads")
|
|
|
|
if args.quality < 0 or args.quality > 100:
|
|
print('[ERROR] Ungültige Qualität. Erlaubt: 0..100')
|
|
sys.exit(1)
|
|
|
|
input_files: list[Path] = []
|
|
if args.input:
|
|
input_files.extend(expand_inputs(args.input))
|
|
if args.input_dir:
|
|
input_files.extend(collect_from_dir(Path(args.input_dir), args.pattern, args.recursive))
|
|
|
|
if len(input_files) == 0:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
batch_mode = len(input_files) > 1
|
|
|
|
if batch_mode and not args.output_dir:
|
|
print('[ERROR] Batch-Modus erkannt. Bitte -O/--output-dir angeben.')
|
|
sys.exit(2)
|
|
|
|
if not which('caesiumclt'):
|
|
print("[ERROR] 'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
|
|
sys.exit(3)
|
|
|
|
overall_before = 0
|
|
overall_after = 0
|
|
successes = 0
|
|
failures = 0
|
|
|
|
if batch_mode:
|
|
out_dir = Path(args.output_dir).resolve()
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
print(f"Batch: {len(input_files)} Datei(en). Output-Verzeichnis: {out_dir}")
|
|
for src in input_files:
|
|
if not src.exists():
|
|
print(f"- Übersprungen (nicht gefunden): {src}")
|
|
failures += 1
|
|
continue
|
|
dst = out_dir / f"{src.stem}_compressed.pptx"
|
|
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
|
|
if res.ok:
|
|
successes += 1
|
|
overall_before += res.size_before
|
|
overall_after += res.size_after
|
|
else:
|
|
failures += 1
|
|
print(f" Fehler: {src.name} -> {res.error}")
|
|
else:
|
|
src = input_files[0]
|
|
if args.output_dir:
|
|
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
|
|
dst = Path(args.output_dir) / f"{src.stem}_compressed.pptx"
|
|
else:
|
|
dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx")
|
|
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
|
|
if res.ok:
|
|
successes += 1
|
|
overall_before += res.size_before
|
|
overall_after += res.size_after
|
|
else:
|
|
failures += 1
|
|
print(f" Fehler: {src.name} -> {res.error}")
|
|
|
|
if batch_mode:
|
|
|
|
print(f"====== Gesamt-Summary ======")
|
|
print(f"[SUCCESS] Dateien erfolgreich: {successes}")
|
|
|
|
if failures > 0:
|
|
print(f"[FAILED] Dateien fehlgeschlagen: {failures}")
|
|
|
|
if overall_before > 0:
|
|
pct = round(100.0 * (overall_before - overall_after) / overall_before, 2)
|
|
else:
|
|
pct = 0.0
|
|
print(f"Gesamtgröße vorher: {human_mb(overall_before)} MB")
|
|
print(f"Gesamtgröße nachher: {human_mb(overall_after)} MB")
|
|
print(f"Gesamt-Ersparnis: {pct}%")
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|