From c69ec1eecbc1e3f88465d28b5da83c0bce83d700 Mon Sep 17 00:00:00 2001 From: Frank Conrads Date: Thu, 9 Apr 2026 10:26:45 +0200 Subject: [PATCH] fix: apply medium/high severity code review findings - Re-raise worker futures in as_completed to surface thread exceptions - Replace hardcoded extension set with ALLOWED_EXT constant in compress_with_caesium - Initialise work_dir/scratch_dir to None before try block to prevent NameError in finally - Remove unused dead function get_slide_numbers_for_image - Simplify redundant caesium_threads guard (threads and threads > 1 -> threads > 1) - Write [Content_Types].xml first in ZIP to satisfy OOXML spec Co-Authored-By: Abacus.AI CLI --- pptx_image_compress.py | 48 +++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/pptx_image_compress.py b/pptx_image_compress.py index 2e06c4a..f75ff16 100644 --- a/pptx_image_compress.py +++ b/pptx_image_compress.py @@ -116,12 +116,18 @@ def print_progress(i: int, total: int): print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True) def zip_dir_to_pptx(src_dir: Path, out_pptx: Path): + all_files: list[Path] = [] + for root, _, files in os.walk(src_dir): + for f in files: + all_files.append(Path(root) / f) + + content_types = [f for f in all_files if f.name == "[Content_Types].xml"] + rest = [f for f in all_files if f.name != "[Content_Types].xml"] + with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z: - for root, _, files in os.walk(src_dir): - for f in files: - full = Path(root) / f - rel = full.relative_to(src_dir) - z.write(full, arcname=str(rel)) + for full in content_types + rest: + rel = full.relative_to(src_dir) + z.write(full, arcname=str(rel)) def which(cmd: str): return shutil.which(cmd) @@ -132,7 +138,7 @@ def compress_with_caesium(original: Path, out_dir: Path, caesium_threads: int | raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.") out_dir.mkdir(parents=True, exist_ok=True) ext = original.suffix.lower() - if ext not in {".jpg", ".jpeg", ".png", ".webp", ".gif"}: + if ext not in ALLOWED_EXT: return None cmd = [exe, "-q", str(quality), "-O", "bigger", "--min-savings", min_savings, "-o", str(out_dir)] if caesium_threads is not None: @@ -187,12 +193,6 @@ def build_image_slide_index(rels_dir: Path) -> dict[str, List[int]]: return {img: sorted(slides) for img, slides in image_to_slides.items()} -def get_slide_numbers_for_image(rels_dir: Path, image_filename: str) -> Optional[List[int]]: - image_to_slides = build_image_slide_index(rels_dir) - slides = image_to_slides.get(image_filename) - return slides if slides else None - - def process_image_file( idx: int, img_path: Path, @@ -243,6 +243,8 @@ def process_single_deck( input=str(input_pptx), output=str(output_pptx), ) + work_dir: Optional[Path] = None + scratch_dir: Optional[Path] = None try: if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx": @@ -253,7 +255,6 @@ def process_single_deck( work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_")) scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_")) - log_file = output_pptx.with_suffix(".log.csv") ensure_clean_file(log_file) log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"] @@ -277,7 +278,7 @@ def process_single_deck( if not which("caesiumclt") and compressor is compress_with_caesium: raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.") - caesium_threads = 1 if threads and threads > 1 else None + caesium_threads = 1 if threads > 1 else None lock = Lock() done_count = 0 image_to_slides = build_image_slide_index(rels_dir) @@ -303,8 +304,11 @@ def process_single_deck( if total > 0: with ThreadPoolExecutor(max_workers=max(1, threads)) as ex: futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)] - for _ in as_completed(futures): - pass + for fut in as_completed(futures): + try: + fut.result() + except Exception as exc: + sys.stderr.write(f"[worker] Unerwarteter Fehler: {exc}\n") print() # newline @@ -351,14 +355,10 @@ def process_single_deck( except Exception as e: result.error = str(e) finally: - try: - shutil.rmtree(work_dir, ignore_errors=True) # type: ignore[name-defined] - except Exception: - pass - try: - shutil.rmtree(scratch_dir, ignore_errors=True) # type: ignore[name-defined] - except Exception: - pass + if work_dir is not None: + shutil.rmtree(work_dir, ignore_errors=True) + if scratch_dir is not None: + shutil.rmtree(scratch_dir, ignore_errors=True) cleanup_old_temps() return result