Files
pptx-image-compress/pptx_image_compress.py
T
frank.conrads 75059f829a Add SVG compression via npx svgo
Add vector extension support for .svg and route SVG files through npx svgo before raster compression.

Keep behavior fail-safe: missing npx/svgo or non-zero svgo exit returns None and preserves existing flow.

Extend tests for SVG discovery, SVG routing priority, and missing npx handling.

Co-Authored-By: Abacus.AI CLI <agent@abacus.ai>
2026-06-08 13:40:45 +02:00

758 lines
26 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)
Version: 1.1.7
Highlights:
- Caesium-Scratch außerhalb des PPTX-Arbeitsverzeichnisses -> keine Tempfiles in finaler PPTX
- Safety-Cleanup: entfernt 'caesium*' Ordner und '*.tmp' in ppt/media, bevor gezippt wird
- Overwrite Policy: -O bigger
- Log: image_name,size_before,size_after,saving,saving_percent,in_slide_number,image_type_changed
- Summary inkl. Zeit benötigt
Änderungen in 1.1.7:
- PNG->JPG Fallback für große PNGs hinzugefügt (wenn nach Kompression weiterhin > 500 KB)
- Logging erweitert: neue Spalte image_type_changed mit Wert png_jpg bei Typwechsel
Änderungen in 1.1.6:
- Libcaesium 1.3.0 kann nun auch files ignorieren, wenn die Kompression kleiner als <MIN_SAVING> ist
"""
import argparse
import inspect
import os
import re
import xml.etree.ElementTree as ET
import sys
import zipfile
import tempfile
import shutil
import subprocess
import time
import fnmatch
from glob import glob
from pathlib import Path
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from dataclasses import dataclass
from typing import Callable, List, Optional
__version__ = "1.1.7"
RASTER_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
VECTOR_EXT = {".svg"}
ALLOWED_EXT = RASTER_EXT | VECTOR_EXT
PROGRESS_BAR_LEN = 40
TEMP_PREFIX = "pptx_compress_"
DEFAULT_MIN_SAVINGS = "2%"
PNG_TO_JPEG_THRESHOLD_BYTES = 500 * 1024
@dataclass
class DeckResult:
input: str
output: str
ok: bool = False
size_before: int = 0
size_after: int = 0
elapsed_sec: float = 0.0
error: Optional[str] = None
log_file: Optional[str] = None
@dataclass
class ImageProcessResult:
image_name: str
orig_size: int
chosen_size: int
slide_nr: str
image_type_changed: str = ""
def discover_images(media_dir: Path) -> list[Path]:
images: list[Path] = []
if media_dir.exists():
for f in sorted(media_dir.iterdir()):
if f.is_file() and f.suffix.lower() in ALLOWED_EXT:
images.append(f)
return images
def image_result_to_log_line(image_result: ImageProcessResult) -> str:
saving = image_result.orig_size - image_result.chosen_size
saving_percent = round((saving / image_result.orig_size) * 100, 2) if image_result.orig_size > 0 else 0.0
return f"{image_result.image_name};{human_kb(image_result.orig_size)};{human_kb(image_result.chosen_size)};{human_kb(saving)};{saving_percent};{image_result.slide_nr};{image_result.image_type_changed}\n"
# -------------------- Utilities --------------------
def human_mb(nbytes: int) -> float:
return round(nbytes / (1024 * 1024), 2)
def human_kb(nbytes: int) -> float:
return round(nbytes / 1024,2)
def ensure_clean_file(path: Path):
if path.exists():
try:
if path.is_file():
path.unlink()
else:
shutil.rmtree(path, ignore_errors=True)
except Exception:
pass
def cleanup_old_temps():
tmp_root = Path(tempfile.gettempdir())
for p in tmp_root.glob(f"{TEMP_PREFIX}*"):
try:
if p.is_dir():
shutil.rmtree(p, ignore_errors=True)
else:
p.unlink(missing_ok=True)
except Exception:
pass
def print_progress(i: int, total: int):
if total <= 0:
return
done = int(PROGRESS_BAR_LEN * i / total)
bar = "█" * done + "-" * (PROGRESS_BAR_LEN - done)
pct = int(i * 100 / total)
print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True)
def zip_dir_to_pptx(src_dir: Path, out_pptx: Path):
all_files: list[Path] = []
for root, _, files in os.walk(src_dir):
for f in files:
all_files.append(Path(root) / f)
content_types = [f for f in all_files if f.name == "[Content_Types].xml"]
rest = [f for f in all_files if f.name != "[Content_Types].xml"]
with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z:
for full in content_types + rest:
rel = full.relative_to(src_dir)
z.write(full, arcname=str(rel))
def which(cmd: str):
return shutil.which(cmd)
def compress_with_caesium(
original: Path,
out_dir: Path,
caesium_threads: int | None,
quality: int,
min_savings: str,
output_format: str = "original",
) -> Path | None:
exe = which("caesiumclt")
if not exe:
raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.")
out_dir.mkdir(parents=True, exist_ok=True)
ext = original.suffix.lower()
if ext not in ALLOWED_EXT:
return None
cmd = [
exe,
"-q",
str(quality),
"-O",
"bigger",
"--min-savings",
min_savings,
"--format",
output_format,
"-o",
str(out_dir),
]
if caesium_threads is not None:
cmd += ["--threads", str(caesium_threads)]
cmd += [str(original)]
try:
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
sys.stderr.write(f"[caesiumclt] Fehler bei {original.name}:{r.stderr}")
return None
if output_format == "jpeg":
jpg_out = out_dir / f"{original.stem}.jpg"
jpeg_out = out_dir / f"{original.stem}.jpeg"
if jpg_out.exists():
return jpg_out
if jpeg_out.exists():
return jpeg_out
return None
out_file = out_dir / original.name
return out_file if out_file.exists() else None
except Exception as ex:
sys.stderr.write(f"[caesiumclt] Ausnahme bei {original.name}: {ex}")
return None
def compressor_accepts_output_format(compressor: Callable[..., Path | None]) -> bool:
if compressor is compress_with_caesium:
return True
try:
signature = inspect.signature(compressor)
except (TypeError, ValueError):
return False
return "output_format" in signature.parameters
def run_compressor(
compressor: Callable[..., Path | None],
original: Path,
out_dir: Path,
caesium_threads: int | None,
quality: int,
min_savings: str,
output_format: str = "original",
) -> Path | None:
if output_format != "original" and not compressor_accepts_output_format(compressor):
return None
if compressor is compress_with_caesium:
return compressor(original, out_dir, caesium_threads, quality, min_savings, output_format)
if compressor_accepts_output_format(compressor):
return compressor(original, out_dir, caesium_threads, quality, min_savings, output_format)
return compressor(original, out_dir, caesium_threads, quality, min_savings)
def compress_raster_image(
compressor: Callable[..., Path | None],
original: Path,
out_dir: Path,
caesium_threads: int | None,
quality: int,
min_savings: str,
) -> Path | None:
return run_compressor(
compressor=compressor,
original=original,
out_dir=out_dir,
caesium_threads=caesium_threads,
quality=quality,
min_savings=min_savings,
)
def compress_svg_with_svgo(
original: Path,
out_dir: Path,
) -> Path | None:
if original.suffix.lower() not in VECTOR_EXT:
return None
npx_exe = which("npx")
if not npx_exe:
return None
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / original.name
cmd = [
npx_exe,
"--yes",
"svgo",
str(original),
"-o",
str(out_file),
]
try:
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
sys.stderr.write(f"[svgo] Fehler bei {original.name}:{r.stderr}")
return None
return out_file if out_file.exists() else None
except Exception as ex:
sys.stderr.write(f"[svgo] Ausnahme bei {original.name}: {ex}")
return None
def compress_vector_image(
original: Path,
out_dir: Path,
) -> Path | None:
if original.suffix.lower() == ".svg":
return compress_svg_with_svgo(original=original, out_dir=out_dir)
return None
def compress_image_with_routing(
compressor: Callable[..., Path | None],
original: Path,
out_dir: Path,
caesium_threads: int | None,
quality: int,
min_savings: str,
) -> Path | None:
vector_out = compress_vector_image(original=original, out_dir=out_dir)
if vector_out is not None:
return vector_out
return compress_raster_image(
compressor=compressor,
original=original,
out_dir=out_dir,
caesium_threads=caesium_threads,
quality=quality,
min_savings=min_savings,
)
def update_relationship_targets(work_dir: Path, old_name: str, new_name: str) -> None:
rels_namespace = "{http://schemas.openxmlformats.org/package/2006/relationships}Relationship"
for rels_file in work_dir.rglob("*.rels"):
try:
tree = ET.parse(rels_file)
root = tree.getroot()
changed = False
for rel in root.findall(f".//{rels_namespace}"):
target = rel.attrib.get("Target", "")
if Path(target).name == old_name:
rel.attrib["Target"] = re.sub(r"[^/\\]+$", new_name, target)
changed = True
if changed:
tree.write(rels_file, encoding="utf-8", xml_declaration=True)
except (ET.ParseError, OSError):
continue
def ensure_jpg_content_type(work_dir: Path) -> None:
content_types_path = work_dir / "[Content_Types].xml"
if not content_types_path.exists():
return
content_ns = "{http://schemas.openxmlformats.org/package/2006/content-types}"
try:
tree = ET.parse(content_types_path)
root = tree.getroot()
has_jpg_default = False
for default in root.findall(f"{content_ns}Default"):
ext = default.attrib.get("Extension", "").lower()
if ext == "jpg":
has_jpg_default = True
break
if not has_jpg_default:
ET.SubElement(
root,
f"{content_ns}Default",
{
"Extension": "jpg",
"ContentType": "image/jpeg",
},
)
tree.write(content_types_path, encoding="utf-8", xml_declaration=True)
except (ET.ParseError, OSError):
return
def format_duration(seconds: float) -> str:
total_ms = int(round(seconds * 1000))
td = timedelta(milliseconds=total_ms)
base = str(td)
if "." in base:
hms, frac = base.split(".", 1)
return f"{hms}.{frac[:2]}"
return base
def build_image_slide_index(rels_dir: Path) -> dict[str, List[int]]:
if not rels_dir.exists() or not rels_dir.is_dir():
return {}
image_to_slides: dict[str, set[int]] = {}
for rels_path in rels_dir.iterdir():
rels_file = rels_path.name
if rels_file.startswith("slide") and rels_file.endswith(".xml.rels") and rels_path.is_file():
match = re.search(r"slide(\d+)\.xml\.rels$", rels_file)
if not match:
continue
slide_number = int(match.group(1))
try:
tree = ET.parse(rels_path)
root = tree.getroot()
for rel in root.findall(".//{http://schemas.openxmlformats.org/package/2006/relationships}Relationship"):
target = rel.attrib.get("Target", "")
image_name = Path(target).name
if image_name:
if image_name not in image_to_slides:
image_to_slides[image_name] = set()
image_to_slides[image_name].add(slide_number)
except (ET.ParseError, OSError):
print(f"Fehler beim Lesen von {rels_file}")
return {img: sorted(slides) for img, slides in image_to_slides.items()}
def process_image_file(
idx: int,
img_path: Path,
scratch_dir: Path,
image_to_slides: dict[str, List[int]],
caesium_threads: int | None,
quality: int,
min_savings: str,
compressor: Callable[..., Path | None],
) -> ImageProcessResult:
orig_size = img_path.stat().st_size
chosen_size = orig_size
chosen_name = img_path.name
image_type_changed = ""
found_in_slide = image_to_slides.get(img_path.name)
slide_nr = "NOT_USED" if found_in_slide is None else str(found_in_slide)
try:
out_sub = scratch_dir / f"img_{idx:06d}"
caesium_out = compress_image_with_routing(
compressor=compressor,
original=img_path,
out_dir=out_sub,
caesium_threads=caesium_threads,
quality=quality,
min_savings=min_savings,
)
if caesium_out and caesium_out.exists():
compressed_size = caesium_out.stat().st_size
if compressed_size < orig_size:
tmp_target = img_path.with_suffix(img_path.suffix + ".tmp")
shutil.copy2(caesium_out, tmp_target)
tmp_target.replace(img_path)
chosen_size = compressed_size
if img_path.suffix.lower() == ".png" and chosen_size > PNG_TO_JPEG_THRESHOLD_BYTES:
jpg_candidate = run_compressor(
compressor=compressor,
original=img_path,
out_dir=out_sub,
caesium_threads=caesium_threads,
quality=quality,
min_savings="0%",
output_format="jpeg",
)
if jpg_candidate and jpg_candidate.exists():
jpg_size = jpg_candidate.stat().st_size
if jpg_size < chosen_size:
jpg_target = img_path.with_suffix(".jpg")
tmp_target = jpg_target.with_suffix(".jpg.tmp")
shutil.copy2(jpg_candidate, tmp_target)
tmp_target.replace(jpg_target)
img_path.unlink(missing_ok=True)
chosen_size = jpg_size
chosen_name = jpg_target.name
image_type_changed = "png_jpg"
except Exception:
chosen_size = orig_size
chosen_name = img_path.name
image_type_changed = ""
return ImageProcessResult(
image_name=chosen_name,
orig_size=orig_size,
chosen_size=chosen_size,
slide_nr=slide_nr,
image_type_changed=image_type_changed,
)
# -------------------- Core per-deck processing --------------------
def process_single_deck(
input_pptx: Path,
output_pptx: Path,
threads: int,
quality: int,
min_savings: str,
compressor: Callable[..., Path | None] = compress_with_caesium,
) -> DeckResult:
start_time = time.perf_counter()
result = DeckResult(
input=str(input_pptx),
output=str(output_pptx),
)
work_dir: Optional[Path] = None
scratch_dir: Optional[Path] = None
try:
if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx":
raise ValueError("Eingabedatei existiert nicht oder ist keine .pptx")
cleanup_old_temps()
ensure_clean_file(output_pptx)
work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_"))
scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_"))
log_file = output_pptx.with_suffix(".log.csv")
ensure_clean_file(log_file)
log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number;image_type_changed\n"]
size_before = input_pptx.stat().st_size
result.size_before = size_before
with zipfile.ZipFile(input_pptx, "r") as z:
z.extractall(work_dir)
slides_dir = work_dir / "ppt" / "slides"
rels_dir = slides_dir / "_rels"
media_dir = work_dir / "ppt" / "media"
images = discover_images(media_dir)
total = len(images)
print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden")
print_progress(0, total)
if not which("caesiumclt") and compressor is compress_with_caesium:
raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
caesium_threads = 1 if threads > 1 else None
lock = Lock()
done_count = 0
image_to_slides = build_image_slide_index(rels_dir)
renamed_images: list[tuple[str, str]] = []
def worker(idx: int, img_path: Path):
nonlocal done_count
image_result = process_image_file(
idx=idx,
img_path=img_path,
scratch_dir=scratch_dir,
image_to_slides=image_to_slides,
caesium_threads=caesium_threads,
quality=quality,
min_savings=min_savings,
compressor=compressor,
)
with lock:
if image_result.image_name != img_path.name:
renamed_images.append((img_path.name, image_result.image_name))
log_lines.append(image_result_to_log_line(image_result))
done_count += 1
print_progress(done_count, total)
if total > 0:
with ThreadPoolExecutor(max_workers=max(1, threads)) as ex:
futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)]
for fut in as_completed(futures):
try:
fut.result()
except Exception as exc:
sys.stderr.write(f"[worker] Unerwarteter Fehler: {exc}\n")
if renamed_images:
for old_name, new_name in renamed_images:
update_relationship_targets(work_dir, old_name, new_name)
ensure_jpg_content_type(work_dir)
print() # newline
# Safety cleanup inside work_dir
for p in work_dir.rglob("*"):
try:
if p.is_dir() and p.name.lower().startswith("caesium"):
shutil.rmtree(p, ignore_errors=True)
except Exception:
pass
if media_dir.exists():
for f in media_dir.iterdir():
if f.is_file() and f.suffix.lower() == ".tmp":
try:
f.unlink(missing_ok=True)
except Exception:
pass
zip_dir_to_pptx(work_dir, output_pptx)
size_after = output_pptx.stat().st_size
result.size_after = size_after
try:
with open(log_file, "w", encoding="utf-8") as f:
f.writelines(log_lines)
except Exception:
pass
elapsed = time.perf_counter() - start_time
result.elapsed_sec = elapsed
result.log_file = str(log_file)
result.ok = True
savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2)
print(f"[OK] Fertig! ({input_pptx.name})")
print("Zusammenfassung ----------------")
print(" Vorher: ", human_mb(size_before), "MB")
print(" Nachher: ", human_mb(size_after), "MB")
print(" Ersparnis: ", f"{savings_pct}%")
print(" Zeit: ", format_duration(elapsed))
print(" Log: ", log_file)
except Exception as e:
result.error = str(e)
finally:
if work_dir is not None:
shutil.rmtree(work_dir, ignore_errors=True)
if scratch_dir is not None:
shutil.rmtree(scratch_dir, ignore_errors=True)
cleanup_old_temps()
return result
# -------------------- Input helpers --------------------
def expand_inputs(inputs: list[str]) -> list[Path]:
files: list[Path] = []
for inp in inputs:
p = Path(inp)
if any(ch in inp for ch in ['*', '?']):
for g in glob(inp):
if g.lower().endswith('.pptx'):
files.append(Path(g).resolve())
else:
if p.is_dir():
for g in p.glob('*.pptx'):
files.append(g.resolve())
else:
if p.suffix.lower() == '.pptx':
files.append(p.resolve())
seen = set()
uniq = []
for f in files:
if str(f) not in seen:
uniq.append(f)
seen.add(str(f))
return uniq
def collect_from_dir(input_dir: Path, pattern: str, recursive: bool) -> list[Path]:
files: list[Path] = []
if recursive:
for root, _, names in os.walk(input_dir):
for n in names:
if fnmatch.fnmatch(n, pattern):
p = Path(root) / n
if p.suffix.lower() == '.pptx':
files.append(p.resolve())
else:
for p in input_dir.glob(pattern):
if p.suffix.lower() == '.pptx':
files.append(p.resolve())
seen = set()
out = []
for f in files:
s = str(f)
if s not in seen:
out.append(f)
seen.add(s)
return out
# -------------------- CLI --------------------
def main():
parser, args = extractParserArguments()
input_files = validateParserArguments(parser, args)
batch_mode = len(input_files) > 1
if batch_mode and not args.output_dir:
print('[ERROR] Batch-Modus erkannt. Bitte -O/--output-dir angeben.')
sys.exit(2)
if not which('caesiumclt'):
print("[ERROR] 'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
sys.exit(3)
overall_before = 0
overall_after = 0
successes = 0
failures = 0
if batch_mode:
out_dir = Path(args.output_dir).resolve()
out_dir.mkdir(parents=True, exist_ok=True)
print(f"Batch: {len(input_files)} Datei(en). Output-Verzeichnis: {out_dir}")
for src in input_files:
if not src.exists():
print(f"- Übersprungen (nicht gefunden): {src}")
failures += 1
continue
dst = out_dir / f"{src.stem}_compressed.pptx"
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
if res.ok:
successes += 1
overall_before += res.size_before
overall_after += res.size_after
else:
failures += 1
print(f" Fehler: {src.name} -> {res.error}")
else:
src = input_files[0]
if args.output_dir:
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
dst = Path(args.output_dir) / f"{src.stem}_compressed.pptx"
else:
dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx")
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
if res.ok:
successes += 1
overall_before += res.size_before
overall_after += res.size_after
else:
failures += 1
print(f" Fehler: {src.name} -> {res.error}")
if batch_mode:
print(f"====== Gesamt-Summary ======")
print(f"[SUCCESS] Dateien erfolgreich: {successes}")
if failures > 0:
print(f"[FAILED] Dateien fehlgeschlagen: {failures}")
if overall_before > 0:
pct = round(100.0 * (overall_before - overall_after) / overall_before, 2)
else:
pct = 0.0
print(f"Gesamtgröße vorher: {human_mb(overall_before)} MB")
print(f"Gesamtgröße nachher: {human_mb(overall_after)} MB")
print(f"Gesamt-Ersparnis: {pct}%")
def validateParserArguments(parser, args):
print("Threads used: ", args.threads," Threads")
if args.quality < 0 or args.quality > 100:
print('[ERROR] Ungültige Qualität. Erlaubt: 0..100')
sys.exit(1)
input_files: list[Path] = []
if args.input:
input_files.extend(expand_inputs(args.input))
if args.input_dir:
input_files.extend(collect_from_dir(Path(args.input_dir), args.pattern, args.recursive))
if len(input_files) == 0:
parser.print_help()
sys.exit(1)
return input_files
def extractParserArguments():
parser = argparse.ArgumentParser(
description="PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('-i','--input', nargs='*', help='Input-PPTX (eine oder mehrere, Wildcards erlaubt). Bei mehreren: -O erforderlich.')
parser.add_argument('--input-dir', help='Eingabe-Verzeichnis (optional, für Batch)')
parser.add_argument('-o','--output', help='Output-PPTX (nur Single-Mode)')
parser.add_argument('-O','--output-dir', help='Output-Verzeichnis (erforderlich für Batch)')
parser.add_argument('--pattern', default='*.pptx', help='Dateimuster für --input-dir')
parser.add_argument('--recursive', action='store_true', help='Rekursiv in --input-dir suchen')
#parser.add_argument('-t','--threads', type=int, default=min(32, os.cpu_count() or 4), help='Anzahl paralleler Threads pro Datei')
parser.add_argument('-t','--threads', type=int, default=16, help='Anzahl paralleler Threads pro Datei')
parser.add_argument('-q','--quality', type=int, default=90, help='Qualität für caesiumclt (0..100), höher = bessere Qualität / größere Datei')
parser.add_argument('--min-savings', default=DEFAULT_MIN_SAVINGS, help="Mindestersparnis für caesiumclt (z. B. 2%%, 100KB, 1MB oder Bytes als Zahl)")
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}', help="Zeigt die Versionsnummer an" )
args = parser.parse_args()
return parser,args
if __name__ == '__main__':
main()