4 Commits

Author SHA1 Message Date
c69ec1eecb fix: apply medium/high severity code review findings
- Re-raise worker futures in as_completed to surface thread exceptions
- Replace hardcoded extension set with ALLOWED_EXT constant in compress_with_caesium
- Initialise work_dir/scratch_dir to None before try block to prevent NameError in finally
- Remove unused dead function get_slide_numbers_for_image
- Simplify redundant caesium_threads guard (threads and threads > 1 -> threads > 1)
- Write [Content_Types].xml first in ZIP to satisfy OOXML spec

Co-Authored-By: Abacus.AI CLI <agent@abacus.ai>
2026-04-09 10:26:45 +02:00
252b2c2cd5 readme auf 1.1.6 angepasst 2026-04-09 10:14:18 +02:00
698aac0aba Refactor and UnitTest 2026-04-09 10:10:57 +02:00
332e62b764 Funktionalität min_savings der caesiumclt 1.3.0 implementiert (default: 2%), log-Datei um "Bild in Folien Nr." ergänzt. 2026-04-09 09:40:19 +02:00
5 changed files with 330 additions and 115 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@ python-3.*-embed-amd64.zip
python-embed/* python-embed/*
.vscode/launch.json .vscode/launch.json
logs/*.log logs/*.log
__pycache__/*

11
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,11 @@
{
"python.testing.unittestArgs": [
"-v",
"-s",
".",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}

View File

@@ -1,6 +1,6 @@
# PPTX Image Compressor (CaesiumCLT only) # PPTX Image Compressor (CaesiumCLT only)
**Version 1.1.4** **Version 1.1.6**
Dieses Paket enthält: Dieses Paket enthält:
@@ -11,8 +11,6 @@ PPTX-Image-Compressor/
├─ pptx_image_compress.py ├─ pptx_image_compress.py
├─ bin/ ├─ bin/
│ └─ caesiumclt.exe │ └─ caesiumclt.exe
└─ samples/
└─ README.txt
``` ```
## Schnellstart (ohne Admin-Rechte) ## Schnellstart (ohne Admin-Rechte)
@@ -34,6 +32,7 @@ Die Batch lädt bei Bedarf automatisch das **Windows Embeddable Python Package**
- Entpackt die PPTX in einen TempOrdner - Entpackt die PPTX in einen TempOrdner
- Komprimiert **JPG/JPEG, PNG, WebP, GIF** mit **CaesiumCLT** (Default `-q 90`, `-O bigger`) - Komprimiert **JPG/JPEG, PNG, WebP, GIF** mit **CaesiumCLT** (Default `-q 90`, `-O bigger`)
- Ersetzt Bilder nur, wenn die komprimierte Datei kleiner ist - Ersetzt Bilder nur, wenn die komprimierte Datei kleiner ist
- Ersetzt Bilder nur, wenn sei mindestens 2% kleiner sind (verhindert *doppelte Komprimierung*)
- Schreibt ein CSVLog (`.log` neben der OutputPPTX) - Schreibt ein CSVLog (`.log` neben der OutputPPTX)
- Baut eine neue PPTX und zeigt eine Summary (Name, Größe vorher/nachher, Ersparnis %, Zeit) - Baut eine neue PPTX und zeigt eine Summary (Name, Größe vorher/nachher, Ersparnis %, Zeit)
- Räumt alle temporären Dateien auf (keine CaesiumTempfiles in der finalen PPTX) - Räumt alle temporären Dateien auf (keine CaesiumTempfiles in der finalen PPTX)
@@ -41,6 +40,7 @@ Die Batch lädt bei Bedarf automatisch das **Windows Embeddable Python Package**
## Hinweise ## Hinweise
- `-t` steuert die Parallelität der PythonThreads; intern wird `caesiumclt --threads 1` gesetzt, sobald `-t > 1`, um Oversubscription zu vermeiden. Default ist 16 - `-t` steuert die Parallelität der PythonThreads; intern wird `caesiumclt --threads 1` gesetzt, sobald `-t > 1`, um Oversubscription zu vermeiden. Default ist 16
- `-q` steuert das Qualitätslevel; intern wird `caesiumclt -q` mit diesem Wert von `0..100` benutzt, Default ist 90 - `-q` steuert das Qualitätslevel; intern wird `caesiumclt -q` mit diesem Wert von `0..100` benutzt, Default ist 90
- `--min-savings` steuert das Mindestmass an Komprimierung zur Verhinderung von doppelter Komprimierunt, Default ist 2%
- Die Batch **verwendet bevorzugt das Embeddable Python** neben der BAT; ansonsten sucht sie echte `python.exe`/`py.exe` im PATH, **ignoriert** aber die MicrosoftStoreAliasPfade (`WindowsApps`). - Die Batch **verwendet bevorzugt das Embeddable Python** neben der BAT; ansonsten sucht sie echte `python.exe`/`py.exe` im PATH, **ignoriert** aber die MicrosoftStoreAliasPfade (`WindowsApps`).
## Manuelle Nutzung des .py (falls Python vorhanden) ## Manuelle Nutzung des .py (falls Python vorhanden)

View File

@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup) PPTX Grafik-Komprimier-Tool (nur CaesiumCLT, Multi-Thread, Batch, sauberes Cleanup)
Version: 1.1.4 Version: 1.1.6
Highlights: Highlights:
@@ -12,8 +12,8 @@ Highlights:
- Log: image_name,size_before,size_after,saving,saving_percent - Log: image_name,size_before,size_after,saving,saving_percent
- Summary inkl. Zeit benötigt - Summary inkl. Zeit benötigt
Änderungen in 1.1.4: Änderungen in 1.1.6:
- Libcaesium 1.1.0 kann nun auch gif verkleinern - Libcaesium 1.3.0 kann nun auch files ignorieren, wenn die Kompression kleiner als <MIN_SAVING> ist
""" """
import argparse import argparse
@@ -32,14 +32,51 @@ from pathlib import Path
from datetime import timedelta from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock from threading import Lock
from typing import List, Optional from dataclasses import dataclass
from typing import Callable, List, Optional
__version__ = "1.1.4" __version__ = "1.1.6"
ALLOWED_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif"} ALLOWED_EXT = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
PROGRESS_BAR_LEN = 40 PROGRESS_BAR_LEN = 40
TEMP_PREFIX = "pptx_compress_" TEMP_PREFIX = "pptx_compress_"
DEFAULT_MIN_SAVINGS = "2%"
@dataclass
class DeckResult:
input: str
output: str
ok: bool = False
size_before: int = 0
size_after: int = 0
elapsed_sec: float = 0.0
error: Optional[str] = None
log_file: Optional[str] = None
@dataclass
class ImageProcessResult:
image_name: str
orig_size: int
chosen_size: int
slide_nr: str
def discover_images(media_dir: Path) -> list[Path]:
images: list[Path] = []
if media_dir.exists():
for f in sorted(media_dir.iterdir()):
if f.is_file() and f.suffix.lower() in ALLOWED_EXT:
images.append(f)
return images
def image_result_to_log_line(image_result: ImageProcessResult) -> str:
saving = image_result.orig_size - image_result.chosen_size
saving_percent = round((saving / image_result.orig_size) * 100, 2) if image_result.orig_size > 0 else 0.0
return f"{image_result.image_name};{human_kb(image_result.orig_size)};{human_kb(image_result.chosen_size)};{human_kb(saving)};{saving_percent};{image_result.slide_nr}\n"
# -------------------- Utilities -------------------- # -------------------- Utilities --------------------
@@ -79,25 +116,31 @@ def print_progress(i: int, total: int):
print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True) print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True)
def zip_dir_to_pptx(src_dir: Path, out_pptx: Path): def zip_dir_to_pptx(src_dir: Path, out_pptx: Path):
all_files: list[Path] = []
for root, _, files in os.walk(src_dir):
for f in files:
all_files.append(Path(root) / f)
content_types = [f for f in all_files if f.name == "[Content_Types].xml"]
rest = [f for f in all_files if f.name != "[Content_Types].xml"]
with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z: with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z:
for root, _, files in os.walk(src_dir): for full in content_types + rest:
for f in files: rel = full.relative_to(src_dir)
full = Path(root) / f z.write(full, arcname=str(rel))
rel = full.relative_to(src_dir)
z.write(full, arcname=str(rel))
def which(cmd: str): def which(cmd: str):
return shutil.which(cmd) return shutil.which(cmd)
def compress_with_caesium(original: Path, out_dir: Path, caesium_threads: int | None, quality: int) -> Path | None: def compress_with_caesium(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str) -> Path | None:
exe = which("caesiumclt") exe = which("caesiumclt")
if not exe: if not exe:
raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.") raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.")
out_dir.mkdir(parents=True, exist_ok=True) out_dir.mkdir(parents=True, exist_ok=True)
ext = original.suffix.lower() ext = original.suffix.lower()
if ext not in {".jpg", ".jpeg", ".png", ".webp", ".gif"}: if ext not in ALLOWED_EXT:
return None return None
cmd = [exe, "-q", str(quality), "-O", "bigger", "-o", str(out_dir)] cmd = [exe, "-q", str(quality), "-O", "bigger", "--min-savings", min_savings, "-o", str(out_dir)]
if caesium_threads is not None: if caesium_threads is not None:
cmd += ["--threads", str(caesium_threads)] cmd += ["--threads", str(caesium_threads)]
cmd += [str(original)] cmd += [str(original)]
@@ -121,51 +164,87 @@ def format_duration(seconds: float) -> str:
return f"{hms}.{frac[:2]}" return f"{hms}.{frac[:2]}"
return base return base
def get_slide_numbers_for_image(rels_dir: str, image_filename: str) -> Optional[List[int]]: def build_image_slide_index(rels_dir: Path) -> dict[str, List[int]]:
""" if not rels_dir.exists() or not rels_dir.is_dir():
Durchsucht alle .rels-Dateien im angegebenen Verzeichnis und gibt die Slide-Nummern zurück, return {}
in denen die angegebene Bilddatei referenziert wird.
:param rels_dir: Pfad zum Verzeichnis ppt/slides/_rels image_to_slides: dict[str, set[int]] = {}
:param image_filename: z.B. 'image80.png'
:return: Liste von Slide-Nummern oder None
"""
slide_numbers = []
for rels_file in os.listdir(rels_dir): for rels_path in rels_dir.iterdir():
if rels_file.startswith("slide") and rels_file.endswith(".xml.rels"): rels_file = rels_path.name
rels_path = os.path.join(rels_dir, rels_file) if rels_file.startswith("slide") and rels_file.endswith(".xml.rels") and rels_path.is_file():
match = re.search(r"slide(\d+)\.xml\.rels$", rels_file)
if not match:
continue
slide_number = int(match.group(1))
try: try:
tree = ET.parse(rels_path) tree = ET.parse(rels_path)
root = tree.getroot() root = tree.getroot()
for rel in root.findall(".//{http://schemas.openxmlformats.org/package/2006/relationships}Relationship"): for rel in root.findall(".//{http://schemas.openxmlformats.org/package/2006/relationships}Relationship"):
target = rel.attrib.get("Target", "") target = rel.attrib.get("Target", "")
if image_filename in target: image_name = Path(target).name
match = re.search(r"slide(\d+).xml.rels", rels_file) if image_name:
if match: if image_name not in image_to_slides:
slide_number = int(match.group(1)) image_to_slides[image_name] = set()
slide_numbers.append(slide_number) image_to_slides[image_name].add(slide_number)
except ET.ParseError: except (ET.ParseError, OSError):
print(f"Fehler beim Parsen von {rels_file}") print(f"Fehler beim Lesen von {rels_file}")
return slide_numbers if slide_numbers else None return {img: sorted(slides) for img, slides in image_to_slides.items()}
def process_image_file(
idx: int,
img_path: Path,
scratch_dir: Path,
image_to_slides: dict[str, List[int]],
caesium_threads: int | None,
quality: int,
min_savings: str,
compressor: Callable[[Path, Path, int | None, int, str], Path | None],
) -> ImageProcessResult:
orig_size = img_path.stat().st_size
chosen_size = orig_size
found_in_slide = image_to_slides.get(img_path.name)
slide_nr = "NOT_USED" if found_in_slide is None else str(found_in_slide)
try:
out_sub = scratch_dir / f"img_{idx:06d}"
caesium_out = compressor(img_path, out_sub, caesium_threads, quality, min_savings)
if caesium_out and caesium_out.exists():
s = caesium_out.stat().st_size
if s < orig_size:
tmp_target = img_path.with_suffix(img_path.suffix + ".tmp")
shutil.copy2(caesium_out, tmp_target)
tmp_target.replace(img_path)
chosen_size = s
except Exception:
chosen_size = orig_size
return ImageProcessResult(
image_name=img_path.name,
orig_size=orig_size,
chosen_size=chosen_size,
slide_nr=slide_nr,
)
# -------------------- Core per-deck processing -------------------- # -------------------- Core per-deck processing --------------------
def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quality: int) -> dict: def process_single_deck(
input_pptx: Path,
output_pptx: Path,
threads: int,
quality: int,
min_savings: str,
compressor: Callable[[Path, Path, int | None, int, str], Path | None] = compress_with_caesium,
) -> DeckResult:
start_time = time.perf_counter() start_time = time.perf_counter()
result = { result = DeckResult(
"input": str(input_pptx), input=str(input_pptx),
"output": str(output_pptx), output=str(output_pptx),
"ok": False, )
"size_before": 0, work_dir: Optional[Path] = None
"size_after": 0, scratch_dir: Optional[Path] = None
"elapsed_sec": 0.0,
"error": None,
"log_file": None,
}
try: try:
if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx": if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx":
@@ -176,77 +255,60 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_")) work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_"))
scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_")) scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_"))
log_file = output_pptx.with_suffix(".log.csv") log_file = output_pptx.with_suffix(".log.csv")
ensure_clean_file(log_file) ensure_clean_file(log_file)
log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"] log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"]
size_before = input_pptx.stat().st_size size_before = input_pptx.stat().st_size
result["size_before"] = size_before result.size_before = size_before
with zipfile.ZipFile(input_pptx, "r") as z: with zipfile.ZipFile(input_pptx, "r") as z:
z.extractall(work_dir) z.extractall(work_dir)
slides_dir = work_dir / "ppt" / "slides" slides_dir = work_dir / "ppt" / "slides"
rels_dir = slides_dir / "_rels"
media_dir = work_dir / "ppt" / "media" media_dir = work_dir / "ppt" / "media"
images = [] images = discover_images(media_dir)
if media_dir.exists():
for f in sorted(media_dir.iterdir()):
if f.is_file() and f.suffix.lower() in ALLOWED_EXT:
images.append(f)
total = len(images) total = len(images)
print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden") print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden")
print_progress(0, total) print_progress(0, total)
if not which("caesiumclt"): if not which("caesiumclt") and compressor is compress_with_caesium:
raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.") raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
caesium_threads = 1 if threads and threads > 1 else None caesium_threads = 1 if threads > 1 else None
lock = Lock() lock = Lock()
done_count = 0 done_count = 0
image_to_slides = build_image_slide_index(rels_dir)
def worker(idx: int, img_path: Path): def worker(idx: int, img_path: Path):
nonlocal done_count nonlocal done_count
ext = img_path.suffix.lower() image_result = process_image_file(
orig_size = img_path.stat().st_size idx=idx,
chosen_size = orig_size img_path=img_path,
found_in_slide=None scratch_dir=scratch_dir,
slide_nr="" image_to_slides=image_to_slides,
caesium_threads=caesium_threads,
quality=quality,
min_savings=min_savings,
compressor=compressor,
)
try: with lock:
found_in_slide = get_slide_numbers_for_image(slides_dir.name, img_path.name) log_lines.append(image_result_to_log_line(image_result))
if found_in_slide is None: done_count += 1
slide_nr = "NOT_USED" print_progress(done_count, total)
else:
slide_nr = str(found_in_slide)
out_sub = scratch_dir / f"img_{idx:06d}"
caesium_out = compress_with_caesium(img_path, out_sub, caesium_threads, quality)
if caesium_out and caesium_out.exists():
s = caesium_out.stat().st_size
if s < orig_size:
tmp_target = img_path.with_suffix(img_path.suffix + ".tmp")
shutil.copy2(caesium_out, tmp_target)
tmp_target.replace(img_path)
chosen_size = s
except Exception:
chosen_size = orig_size
finally:
saving = orig_size - chosen_size
saving_percent = round((saving / orig_size) * 100, 2) if orig_size > 0 else 0.0
with lock:
log_lines.append(f"{img_path.name};{human_kb(orig_size)};{human_kb(chosen_size)};{human_kb(saving)};{saving_percent};{slide_nr}\n")
done_count += 1
print_progress(done_count, total)
if total > 0: if total > 0:
with ThreadPoolExecutor(max_workers=max(1, threads)) as ex: with ThreadPoolExecutor(max_workers=max(1, threads)) as ex:
futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)] futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)]
for _ in as_completed(futures): for fut in as_completed(futures):
pass try:
fut.result()
except Exception as exc:
sys.stderr.write(f"[worker] Unerwarteter Fehler: {exc}\n")
print() # newline print() # newline
@@ -268,7 +330,7 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
zip_dir_to_pptx(work_dir, output_pptx) zip_dir_to_pptx(work_dir, output_pptx)
size_after = output_pptx.stat().st_size size_after = output_pptx.stat().st_size
result["size_after"] = size_after result.size_after = size_after
try: try:
with open(log_file, "w", encoding="utf-8") as f: with open(log_file, "w", encoding="utf-8") as f:
@@ -277,9 +339,9 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
pass pass
elapsed = time.perf_counter() - start_time elapsed = time.perf_counter() - start_time
result["elapsed_sec"] = elapsed result.elapsed_sec = elapsed
result["log_file"] = str(log_file) result.log_file = str(log_file)
result["ok"] = True result.ok = True
savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2) savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2)
print(f"[OK] Fertig! ({input_pptx.name})") print(f"[OK] Fertig! ({input_pptx.name})")
@@ -291,16 +353,12 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
print(" Log: ", log_file) print(" Log: ", log_file)
except Exception as e: except Exception as e:
result["error"] = str(e) result.error = str(e)
finally: finally:
try: if work_dir is not None:
shutil.rmtree(work_dir, ignore_errors=True) # type: ignore[name-defined] shutil.rmtree(work_dir, ignore_errors=True)
except Exception: if scratch_dir is not None:
pass shutil.rmtree(scratch_dir, ignore_errors=True)
try:
shutil.rmtree(scratch_dir, ignore_errors=True) # type: ignore[name-defined]
except Exception:
pass
cleanup_old_temps() cleanup_old_temps()
return result return result
@@ -359,15 +417,16 @@ def main():
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
parser.add_argument('-i','--input', nargs='*', help='Input-PPTX (eine oder mehrere, Wildcards erlaubt). Bei mehreren: -O erforderlich.') parser.add_argument('-i','--input', nargs='*', help='Input-PPTX (eine oder mehrere, Wildcards erlaubt). Bei mehreren: -O erforderlich.')
parser.add_argument('--input-dir', help='Eingabe-Verzeichnis (optional, für Batch)')
parser.add_argument('-o','--output', help='Output-PPTX (nur Single-Mode)') parser.add_argument('-o','--output', help='Output-PPTX (nur Single-Mode)')
parser.add_argument('-O','--output-dir', help='Output-Verzeichnis (erforderlich für Batch)') parser.add_argument('-O','--output-dir', help='Output-Verzeichnis (erforderlich für Batch)')
parser.add_argument('--input-dir', help='Eingabe-Verzeichnis (optional, für Batch)')
parser.add_argument('--pattern', default='*.pptx', help='Dateimuster für --input-dir') parser.add_argument('--pattern', default='*.pptx', help='Dateimuster für --input-dir')
parser.add_argument('--recursive', action='store_true', help='Rekursiv in --input-dir suchen') parser.add_argument('--recursive', action='store_true', help='Rekursiv in --input-dir suchen')
#parser.add_argument('-t','--threads', type=int, default=min(32, os.cpu_count() or 4), help='Anzahl paralleler Threads pro Datei') #parser.add_argument('-t','--threads', type=int, default=min(32, os.cpu_count() or 4), help='Anzahl paralleler Threads pro Datei')
parser.add_argument('-t','--threads', type=int, default=16, help='Anzahl paralleler Threads pro Datei') parser.add_argument('-t','--threads', type=int, default=16, help='Anzahl paralleler Threads pro Datei')
parser.add_argument('-q','--quality', type=int, default=90, help='Qualität für caesiumclt (0..100), höher = bessere Qualität / größere Datei') parser.add_argument('-q','--quality', type=int, default=90, help='Qualität für caesiumclt (0..100), höher = bessere Qualität / größere Datei')
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}') parser.add_argument('--min-savings', default=DEFAULT_MIN_SAVINGS, help="Mindestersparnis für caesiumclt (z. B. 2%%, 100KB, 1MB oder Bytes als Zahl)")
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}', help="Zeigt die Versionsnummer an" )
args = parser.parse_args() args = parser.parse_args()
@@ -412,14 +471,14 @@ def main():
failures += 1 failures += 1
continue continue
dst = out_dir / f"{src.stem}_compressed.pptx" dst = out_dir / f"{src.stem}_compressed.pptx"
res = process_single_deck(src, dst, args.threads, args.quality) res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
if res['ok']: if res.ok:
successes += 1 successes += 1
overall_before += res['size_before'] overall_before += res.size_before
overall_after += res['size_after'] overall_after += res.size_after
else: else:
failures += 1 failures += 1
print(f" Fehler: {src.name} -> {res['error']}") print(f" Fehler: {src.name} -> {res.error}")
else: else:
src = input_files[0] src = input_files[0]
if args.output_dir: if args.output_dir:
@@ -427,14 +486,14 @@ def main():
dst = Path(args.output_dir) / f"{src.stem}_compressed.pptx" dst = Path(args.output_dir) / f"{src.stem}_compressed.pptx"
else: else:
dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx") dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx")
res = process_single_deck(src, dst, args.threads, args.quality) res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
if res['ok']: if res.ok:
successes += 1 successes += 1
overall_before += res['size_before'] overall_before += res.size_before
overall_after += res['size_after'] overall_after += res.size_after
else: else:
failures += 1 failures += 1
print(f" Fehler: {src.name} -> {res['error']}") print(f" Fehler: {src.name} -> {res.error}")
if batch_mode: if batch_mode:

144
test_pptx_image_compress.py Normal file
View File

@@ -0,0 +1,144 @@
import tempfile
import unittest
import zipfile
from pathlib import Path
import pptx_image_compress as pic
class TestPptxImageCompress(unittest.TestCase):
def test_discover_images_filters_extensions(self):
with tempfile.TemporaryDirectory() as td:
media_dir = Path(td)
(media_dir / "a.jpg").write_bytes(b"1")
(media_dir / "b.png").write_bytes(b"1")
(media_dir / "c.txt").write_bytes(b"1")
(media_dir / "d.GIF").write_bytes(b"1")
images = pic.discover_images(media_dir)
self.assertEqual([p.name for p in images], ["a.jpg", "b.png", "d.GIF"])
def test_image_result_to_log_line(self):
image_result = pic.ImageProcessResult(
image_name="image1.png",
orig_size=1000,
chosen_size=800,
slide_nr="[1, 2]",
)
line = pic.image_result_to_log_line(image_result)
self.assertIn("image1.png", line)
self.assertIn("[1, 2]", line)
self.assertIn("20.0", line)
def test_process_image_file_replaces_when_smaller(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
img = root / "image1.png"
img.write_bytes(b"A" * 100)
scratch = root / "scratch"
def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str):
out_dir.mkdir(parents=True, exist_ok=True)
out = out_dir / original.name
out.write_bytes(b"B" * 40)
return out
result = pic.process_image_file(
idx=1,
img_path=img,
scratch_dir=scratch,
image_to_slides={"image1.png": [1]},
caesium_threads=1,
quality=90,
min_savings="2%",
compressor=fake_compressor,
)
self.assertEqual(result.chosen_size, 40)
self.assertEqual(img.stat().st_size, 40)
self.assertEqual(result.slide_nr, "[1]")
def test_process_image_file_keeps_original_when_bigger(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
img = root / "image1.png"
img.write_bytes(b"A" * 100)
scratch = root / "scratch"
def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str):
out_dir.mkdir(parents=True, exist_ok=True)
out = out_dir / original.name
out.write_bytes(b"B" * 120)
return out
result = pic.process_image_file(
idx=1,
img_path=img,
scratch_dir=scratch,
image_to_slides={},
caesium_threads=1,
quality=90,
min_savings="2%",
compressor=fake_compressor,
)
self.assertEqual(result.chosen_size, 100)
self.assertEqual(img.stat().st_size, 100)
self.assertEqual(result.slide_nr, "NOT_USED")
def test_process_single_deck_with_injected_compressor(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
input_pptx = root / "input.pptx"
output_pptx = root / "output.pptx"
source_tree = root / "src"
rels_dir = source_tree / "ppt" / "slides" / "_rels"
media_dir = source_tree / "ppt" / "media"
rels_dir.mkdir(parents=True, exist_ok=True)
media_dir.mkdir(parents=True, exist_ok=True)
rels_xml = (
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>"
"<Relationships xmlns=\"http://schemas.openxmlformats.org/package/2006/relationships\">"
"<Relationship Id=\"rId2\" Type=\"http://schemas.openxmlformats.org/officeDocument/2006/relationships/image\" Target=\"../media/image1.png\"/>"
"</Relationships>"
)
(rels_dir / "slide1.xml.rels").write_text(rels_xml, encoding="utf-8")
(media_dir / "image1.png").write_bytes(b"A" * 100)
with zipfile.ZipFile(input_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z:
for p in source_tree.rglob("*"):
if p.is_file():
z.write(p, arcname=str(p.relative_to(source_tree)))
def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str):
out_dir.mkdir(parents=True, exist_ok=True)
out = out_dir / original.name
out.write_bytes(b"B" * 50)
return out
result = pic.process_single_deck(
input_pptx=input_pptx,
output_pptx=output_pptx,
threads=2,
quality=90,
min_savings="2%",
compressor=fake_compressor,
)
self.assertTrue(result.ok)
self.assertEqual(result.error, None)
self.assertTrue(output_pptx.exists())
self.assertIsNotNone(result.log_file)
with zipfile.ZipFile(output_pptx, "r") as z:
out_image = z.read("ppt/media/image1.png")
self.assertEqual(len(out_image), 50)
log_file = result.log_file
if log_file is None:
self.fail("log_file should not be None")
log_text = Path(log_file).read_text(encoding="utf-8")
self.assertIn("image1.png", log_text)
self.assertIn("[1]", log_text)
if __name__ == "__main__":
unittest.main()