From 698aac0aba1b47f0b22cda443099606af692b116 Mon Sep 17 00:00:00 2001 From: Frank Conrads Date: Thu, 9 Apr 2026 10:10:57 +0200 Subject: [PATCH] Refactor and UnitTest --- .gitignore | 3 +- .vscode/settings.json | 11 +++ pptx_image_compress.py | 175 +++++++++++++++++++++++------------- test_pptx_image_compress.py | 144 +++++++++++++++++++++++++++++ 4 files changed, 270 insertions(+), 63 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 test_pptx_image_compress.py diff --git a/.gitignore b/.gitignore index 4e1a59c..5a597e0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ python-3.*-embed-amd64.zip python-embed/* .vscode/launch.json -logs/*.log \ No newline at end of file +logs/*.log +__pycache__/* \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..184c4eb --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +{ + "python.testing.unittestArgs": [ + "-v", + "-s", + ".", + "-p", + "test_*.py" + ], + "python.testing.pytestEnabled": false, + "python.testing.unittestEnabled": true +} \ No newline at end of file diff --git a/pptx_image_compress.py b/pptx_image_compress.py index da20fdd..2e06c4a 100644 --- a/pptx_image_compress.py +++ b/pptx_image_compress.py @@ -32,7 +32,8 @@ from pathlib import Path from datetime import timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock -from typing import List, Optional +from dataclasses import dataclass +from typing import Callable, List, Optional __version__ = "1.1.6" @@ -43,6 +44,41 @@ TEMP_PREFIX = "pptx_compress_" DEFAULT_MIN_SAVINGS = "2%" +@dataclass +class DeckResult: + input: str + output: str + ok: bool = False + size_before: int = 0 + size_after: int = 0 + elapsed_sec: float = 0.0 + error: Optional[str] = None + log_file: Optional[str] = None + + +@dataclass +class ImageProcessResult: + image_name: str + orig_size: int + chosen_size: int + slide_nr: str + + +def discover_images(media_dir: Path) -> list[Path]: + images: list[Path] = [] + if media_dir.exists(): + for f in sorted(media_dir.iterdir()): + if f.is_file() and f.suffix.lower() in ALLOWED_EXT: + images.append(f) + return images + + +def image_result_to_log_line(image_result: ImageProcessResult) -> str: + saving = image_result.orig_size - image_result.chosen_size + saving_percent = round((saving / image_result.orig_size) * 100, 2) if image_result.orig_size > 0 else 0.0 + return f"{image_result.image_name};{human_kb(image_result.orig_size)};{human_kb(image_result.chosen_size)};{human_kb(saving)};{saving_percent};{image_result.slide_nr}\n" + + # -------------------- Utilities -------------------- def human_mb(nbytes: int) -> float: return round(nbytes / (1024 * 1024), 2) @@ -157,21 +193,56 @@ def get_slide_numbers_for_image(rels_dir: Path, image_filename: str) -> Optional return slides if slides else None +def process_image_file( + idx: int, + img_path: Path, + scratch_dir: Path, + image_to_slides: dict[str, List[int]], + caesium_threads: int | None, + quality: int, + min_savings: str, + compressor: Callable[[Path, Path, int | None, int, str], Path | None], +) -> ImageProcessResult: + orig_size = img_path.stat().st_size + chosen_size = orig_size + found_in_slide = image_to_slides.get(img_path.name) + slide_nr = "NOT_USED" if found_in_slide is None else str(found_in_slide) + + try: + out_sub = scratch_dir / f"img_{idx:06d}" + caesium_out = compressor(img_path, out_sub, caesium_threads, quality, min_savings) + if caesium_out and caesium_out.exists(): + s = caesium_out.stat().st_size + if s < orig_size: + tmp_target = img_path.with_suffix(img_path.suffix + ".tmp") + shutil.copy2(caesium_out, tmp_target) + tmp_target.replace(img_path) + chosen_size = s + except Exception: + chosen_size = orig_size + + return ImageProcessResult( + image_name=img_path.name, + orig_size=orig_size, + chosen_size=chosen_size, + slide_nr=slide_nr, + ) # -------------------- Core per-deck processing -------------------- -def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quality: int, min_savings: str) -> dict: +def process_single_deck( + input_pptx: Path, + output_pptx: Path, + threads: int, + quality: int, + min_savings: str, + compressor: Callable[[Path, Path, int | None, int, str], Path | None] = compress_with_caesium, +) -> DeckResult: start_time = time.perf_counter() - result = { - "input": str(input_pptx), - "output": str(output_pptx), - "ok": False, - "size_before": 0, - "size_after": 0, - "elapsed_sec": 0.0, - "error": None, - "log_file": None, - } + result = DeckResult( + input=str(input_pptx), + output=str(output_pptx), + ) try: if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx": @@ -188,7 +259,7 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"] size_before = input_pptx.stat().st_size - result["size_before"] = size_before + result.size_before = size_before with zipfile.ZipFile(input_pptx, "r") as z: z.extractall(work_dir) @@ -196,19 +267,14 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali slides_dir = work_dir / "ppt" / "slides" rels_dir = slides_dir / "_rels" media_dir = work_dir / "ppt" / "media" - - images = [] - if media_dir.exists(): - for f in sorted(media_dir.iterdir()): - if f.is_file() and f.suffix.lower() in ALLOWED_EXT: - images.append(f) + images = discover_images(media_dir) total = len(images) print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden") print_progress(0, total) - if not which("caesiumclt"): + if not which("caesiumclt") and compressor is compress_with_caesium: raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.") caesium_threads = 1 if threads and threads > 1 else None @@ -218,36 +284,21 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali def worker(idx: int, img_path: Path): nonlocal done_count - orig_size = img_path.stat().st_size - chosen_size = orig_size - found_in_slide = None - slide_nr = "" + image_result = process_image_file( + idx=idx, + img_path=img_path, + scratch_dir=scratch_dir, + image_to_slides=image_to_slides, + caesium_threads=caesium_threads, + quality=quality, + min_savings=min_savings, + compressor=compressor, + ) - try: - found_in_slide = image_to_slides.get(img_path.name) - if found_in_slide is None: - slide_nr = "NOT_USED" - else: - slide_nr = str(found_in_slide) - out_sub = scratch_dir / f"img_{idx:06d}" - caesium_out = compress_with_caesium(img_path, out_sub, caesium_threads, quality, min_savings) - if caesium_out and caesium_out.exists(): - s = caesium_out.stat().st_size - if s < orig_size: - tmp_target = img_path.with_suffix(img_path.suffix + ".tmp") - shutil.copy2(caesium_out, tmp_target) - tmp_target.replace(img_path) - chosen_size = s - except Exception: - chosen_size = orig_size - finally: - saving = orig_size - chosen_size - saving_percent = round((saving / orig_size) * 100, 2) if orig_size > 0 else 0.0 - - with lock: - log_lines.append(f"{img_path.name};{human_kb(orig_size)};{human_kb(chosen_size)};{human_kb(saving)};{saving_percent};{slide_nr}\n") - done_count += 1 - print_progress(done_count, total) + with lock: + log_lines.append(image_result_to_log_line(image_result)) + done_count += 1 + print_progress(done_count, total) if total > 0: with ThreadPoolExecutor(max_workers=max(1, threads)) as ex: @@ -275,7 +326,7 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali zip_dir_to_pptx(work_dir, output_pptx) size_after = output_pptx.stat().st_size - result["size_after"] = size_after + result.size_after = size_after try: with open(log_file, "w", encoding="utf-8") as f: @@ -284,9 +335,9 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali pass elapsed = time.perf_counter() - start_time - result["elapsed_sec"] = elapsed - result["log_file"] = str(log_file) - result["ok"] = True + result.elapsed_sec = elapsed + result.log_file = str(log_file) + result.ok = True savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2) print(f"[OK] Fertig! ({input_pptx.name})") @@ -298,7 +349,7 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali print(" Log: ", log_file) except Exception as e: - result["error"] = str(e) + result.error = str(e) finally: try: shutil.rmtree(work_dir, ignore_errors=True) # type: ignore[name-defined] @@ -421,13 +472,13 @@ def main(): continue dst = out_dir / f"{src.stem}_compressed.pptx" res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings) - if res['ok']: + if res.ok: successes += 1 - overall_before += res['size_before'] - overall_after += res['size_after'] + overall_before += res.size_before + overall_after += res.size_after else: failures += 1 - print(f" Fehler: {src.name} -> {res['error']}") + print(f" Fehler: {src.name} -> {res.error}") else: src = input_files[0] if args.output_dir: @@ -436,13 +487,13 @@ def main(): else: dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx") res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings) - if res['ok']: + if res.ok: successes += 1 - overall_before += res['size_before'] - overall_after += res['size_after'] + overall_before += res.size_before + overall_after += res.size_after else: failures += 1 - print(f" Fehler: {src.name} -> {res['error']}") + print(f" Fehler: {src.name} -> {res.error}") if batch_mode: diff --git a/test_pptx_image_compress.py b/test_pptx_image_compress.py new file mode 100644 index 0000000..8b9dee1 --- /dev/null +++ b/test_pptx_image_compress.py @@ -0,0 +1,144 @@ +import tempfile +import unittest +import zipfile +from pathlib import Path + +import pptx_image_compress as pic + + +class TestPptxImageCompress(unittest.TestCase): + def test_discover_images_filters_extensions(self): + with tempfile.TemporaryDirectory() as td: + media_dir = Path(td) + (media_dir / "a.jpg").write_bytes(b"1") + (media_dir / "b.png").write_bytes(b"1") + (media_dir / "c.txt").write_bytes(b"1") + (media_dir / "d.GIF").write_bytes(b"1") + images = pic.discover_images(media_dir) + self.assertEqual([p.name for p in images], ["a.jpg", "b.png", "d.GIF"]) + + def test_image_result_to_log_line(self): + image_result = pic.ImageProcessResult( + image_name="image1.png", + orig_size=1000, + chosen_size=800, + slide_nr="[1, 2]", + ) + line = pic.image_result_to_log_line(image_result) + self.assertIn("image1.png", line) + self.assertIn("[1, 2]", line) + self.assertIn("20.0", line) + + def test_process_image_file_replaces_when_smaller(self): + with tempfile.TemporaryDirectory() as td: + root = Path(td) + img = root / "image1.png" + img.write_bytes(b"A" * 100) + scratch = root / "scratch" + + def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str): + out_dir.mkdir(parents=True, exist_ok=True) + out = out_dir / original.name + out.write_bytes(b"B" * 40) + return out + + result = pic.process_image_file( + idx=1, + img_path=img, + scratch_dir=scratch, + image_to_slides={"image1.png": [1]}, + caesium_threads=1, + quality=90, + min_savings="2%", + compressor=fake_compressor, + ) + + self.assertEqual(result.chosen_size, 40) + self.assertEqual(img.stat().st_size, 40) + self.assertEqual(result.slide_nr, "[1]") + + def test_process_image_file_keeps_original_when_bigger(self): + with tempfile.TemporaryDirectory() as td: + root = Path(td) + img = root / "image1.png" + img.write_bytes(b"A" * 100) + scratch = root / "scratch" + + def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str): + out_dir.mkdir(parents=True, exist_ok=True) + out = out_dir / original.name + out.write_bytes(b"B" * 120) + return out + + result = pic.process_image_file( + idx=1, + img_path=img, + scratch_dir=scratch, + image_to_slides={}, + caesium_threads=1, + quality=90, + min_savings="2%", + compressor=fake_compressor, + ) + + self.assertEqual(result.chosen_size, 100) + self.assertEqual(img.stat().st_size, 100) + self.assertEqual(result.slide_nr, "NOT_USED") + + def test_process_single_deck_with_injected_compressor(self): + with tempfile.TemporaryDirectory() as td: + root = Path(td) + input_pptx = root / "input.pptx" + output_pptx = root / "output.pptx" + source_tree = root / "src" + rels_dir = source_tree / "ppt" / "slides" / "_rels" + media_dir = source_tree / "ppt" / "media" + rels_dir.mkdir(parents=True, exist_ok=True) + media_dir.mkdir(parents=True, exist_ok=True) + + rels_xml = ( + "" + "" + "" + "" + ) + (rels_dir / "slide1.xml.rels").write_text(rels_xml, encoding="utf-8") + (media_dir / "image1.png").write_bytes(b"A" * 100) + + with zipfile.ZipFile(input_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z: + for p in source_tree.rglob("*"): + if p.is_file(): + z.write(p, arcname=str(p.relative_to(source_tree))) + + def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str): + out_dir.mkdir(parents=True, exist_ok=True) + out = out_dir / original.name + out.write_bytes(b"B" * 50) + return out + + result = pic.process_single_deck( + input_pptx=input_pptx, + output_pptx=output_pptx, + threads=2, + quality=90, + min_savings="2%", + compressor=fake_compressor, + ) + + self.assertTrue(result.ok) + self.assertEqual(result.error, None) + self.assertTrue(output_pptx.exists()) + self.assertIsNotNone(result.log_file) + with zipfile.ZipFile(output_pptx, "r") as z: + out_image = z.read("ppt/media/image1.png") + self.assertEqual(len(out_image), 50) + log_file = result.log_file + if log_file is None: + self.fail("log_file should not be None") + log_text = Path(log_file).read_text(encoding="utf-8") + self.assertIn("image1.png", log_text) + self.assertIn("[1]", log_text) + + +if __name__ == "__main__": + unittest.main()