Compare commits
3 Commits
1.1.6
...
c69ec1eecb
| Author | SHA1 | Date | |
|---|---|---|---|
| c69ec1eecb | |||
| 252b2c2cd5 | |||
| 698aac0aba |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@ python-3.*-embed-amd64.zip
|
||||
python-embed/*
|
||||
.vscode/launch.json
|
||||
logs/*.log
|
||||
__pycache__/*
|
||||
11
.vscode/settings.json
vendored
Normal file
11
.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"python.testing.unittestArgs": [
|
||||
"-v",
|
||||
"-s",
|
||||
".",
|
||||
"-p",
|
||||
"test_*.py"
|
||||
],
|
||||
"python.testing.pytestEnabled": false,
|
||||
"python.testing.unittestEnabled": true
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
# PPTX Image Compressor (CaesiumCLT only)
|
||||
**Version 1.1.4**
|
||||
**Version 1.1.6**
|
||||
|
||||
Dieses Paket enthält:
|
||||
|
||||
@@ -11,8 +11,6 @@ PPTX-Image-Compressor/
|
||||
├─ pptx_image_compress.py
|
||||
├─ bin/
|
||||
│ └─ caesiumclt.exe
|
||||
└─ samples/
|
||||
└─ README.txt
|
||||
```
|
||||
|
||||
## Schnellstart (ohne Admin-Rechte)
|
||||
@@ -34,6 +32,7 @@ Die Batch lädt bei Bedarf automatisch das **Windows Embeddable Python Package**
|
||||
- Entpackt die PPTX in einen Temp‑Ordner
|
||||
- Komprimiert **JPG/JPEG, PNG, WebP, GIF** mit **CaesiumCLT** (Default `-q 90`, `-O bigger`)
|
||||
- Ersetzt Bilder nur, wenn die komprimierte Datei kleiner ist
|
||||
- Ersetzt Bilder nur, wenn sei mindestens 2% kleiner sind (verhindert *doppelte Komprimierung*)
|
||||
- Schreibt ein CSV‑Log (`.log` neben der Output‑PPTX)
|
||||
- Baut eine neue PPTX und zeigt eine Summary (Name, Größe vorher/nachher, Ersparnis %, Zeit)
|
||||
- Räumt alle temporären Dateien auf (keine Caesium‑Tempfiles in der finalen PPTX)
|
||||
@@ -41,6 +40,7 @@ Die Batch lädt bei Bedarf automatisch das **Windows Embeddable Python Package**
|
||||
## Hinweise
|
||||
- `-t` steuert die Parallelität der Python‑Threads; intern wird `caesiumclt --threads 1` gesetzt, sobald `-t > 1`, um Oversubscription zu vermeiden. Default ist 16
|
||||
- `-q` steuert das Qualitätslevel; intern wird `caesiumclt -q` mit diesem Wert von `0..100` benutzt, Default ist 90
|
||||
- `--min-savings` steuert das Mindestmass an Komprimierung zur Verhinderung von doppelter Komprimierunt, Default ist 2%
|
||||
- Die Batch **verwendet bevorzugt das Embeddable Python** neben der BAT; ansonsten sucht sie echte `python.exe`/`py.exe` im PATH, **ignoriert** aber die Microsoft‑Store‑Alias‑Pfade (`WindowsApps`).
|
||||
|
||||
## Manuelle Nutzung des .py (falls Python vorhanden)
|
||||
|
||||
@@ -32,7 +32,8 @@ from pathlib import Path
|
||||
from datetime import timedelta
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from threading import Lock
|
||||
from typing import List, Optional
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, List, Optional
|
||||
|
||||
|
||||
__version__ = "1.1.6"
|
||||
@@ -43,6 +44,41 @@ TEMP_PREFIX = "pptx_compress_"
|
||||
DEFAULT_MIN_SAVINGS = "2%"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeckResult:
|
||||
input: str
|
||||
output: str
|
||||
ok: bool = False
|
||||
size_before: int = 0
|
||||
size_after: int = 0
|
||||
elapsed_sec: float = 0.0
|
||||
error: Optional[str] = None
|
||||
log_file: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageProcessResult:
|
||||
image_name: str
|
||||
orig_size: int
|
||||
chosen_size: int
|
||||
slide_nr: str
|
||||
|
||||
|
||||
def discover_images(media_dir: Path) -> list[Path]:
|
||||
images: list[Path] = []
|
||||
if media_dir.exists():
|
||||
for f in sorted(media_dir.iterdir()):
|
||||
if f.is_file() and f.suffix.lower() in ALLOWED_EXT:
|
||||
images.append(f)
|
||||
return images
|
||||
|
||||
|
||||
def image_result_to_log_line(image_result: ImageProcessResult) -> str:
|
||||
saving = image_result.orig_size - image_result.chosen_size
|
||||
saving_percent = round((saving / image_result.orig_size) * 100, 2) if image_result.orig_size > 0 else 0.0
|
||||
return f"{image_result.image_name};{human_kb(image_result.orig_size)};{human_kb(image_result.chosen_size)};{human_kb(saving)};{saving_percent};{image_result.slide_nr}\n"
|
||||
|
||||
|
||||
# -------------------- Utilities --------------------
|
||||
def human_mb(nbytes: int) -> float:
|
||||
return round(nbytes / (1024 * 1024), 2)
|
||||
@@ -80,12 +116,18 @@ def print_progress(i: int, total: int):
|
||||
print(f"\rBilder: |{bar}| {i}/{total} ({pct}%)", end="", flush=True)
|
||||
|
||||
def zip_dir_to_pptx(src_dir: Path, out_pptx: Path):
|
||||
all_files: list[Path] = []
|
||||
for root, _, files in os.walk(src_dir):
|
||||
for f in files:
|
||||
all_files.append(Path(root) / f)
|
||||
|
||||
content_types = [f for f in all_files if f.name == "[Content_Types].xml"]
|
||||
rest = [f for f in all_files if f.name != "[Content_Types].xml"]
|
||||
|
||||
with zipfile.ZipFile(out_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z:
|
||||
for root, _, files in os.walk(src_dir):
|
||||
for f in files:
|
||||
full = Path(root) / f
|
||||
rel = full.relative_to(src_dir)
|
||||
z.write(full, arcname=str(rel))
|
||||
for full in content_types + rest:
|
||||
rel = full.relative_to(src_dir)
|
||||
z.write(full, arcname=str(rel))
|
||||
|
||||
def which(cmd: str):
|
||||
return shutil.which(cmd)
|
||||
@@ -96,7 +138,7 @@ def compress_with_caesium(original: Path, out_dir: Path, caesium_threads: int |
|
||||
raise RuntimeError("[ERROR] 'caesiumclt' wurde nicht gefunden. Bitte CaesiumCLT installieren und in PATH verfügbar machen.")
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
ext = original.suffix.lower()
|
||||
if ext not in {".jpg", ".jpeg", ".png", ".webp", ".gif"}:
|
||||
if ext not in ALLOWED_EXT:
|
||||
return None
|
||||
cmd = [exe, "-q", str(quality), "-O", "bigger", "--min-savings", min_savings, "-o", str(out_dir)]
|
||||
if caesium_threads is not None:
|
||||
@@ -151,27 +193,58 @@ def build_image_slide_index(rels_dir: Path) -> dict[str, List[int]]:
|
||||
return {img: sorted(slides) for img, slides in image_to_slides.items()}
|
||||
|
||||
|
||||
def get_slide_numbers_for_image(rels_dir: Path, image_filename: str) -> Optional[List[int]]:
|
||||
image_to_slides = build_image_slide_index(rels_dir)
|
||||
slides = image_to_slides.get(image_filename)
|
||||
return slides if slides else None
|
||||
def process_image_file(
|
||||
idx: int,
|
||||
img_path: Path,
|
||||
scratch_dir: Path,
|
||||
image_to_slides: dict[str, List[int]],
|
||||
caesium_threads: int | None,
|
||||
quality: int,
|
||||
min_savings: str,
|
||||
compressor: Callable[[Path, Path, int | None, int, str], Path | None],
|
||||
) -> ImageProcessResult:
|
||||
orig_size = img_path.stat().st_size
|
||||
chosen_size = orig_size
|
||||
found_in_slide = image_to_slides.get(img_path.name)
|
||||
slide_nr = "NOT_USED" if found_in_slide is None else str(found_in_slide)
|
||||
|
||||
try:
|
||||
out_sub = scratch_dir / f"img_{idx:06d}"
|
||||
caesium_out = compressor(img_path, out_sub, caesium_threads, quality, min_savings)
|
||||
if caesium_out and caesium_out.exists():
|
||||
s = caesium_out.stat().st_size
|
||||
if s < orig_size:
|
||||
tmp_target = img_path.with_suffix(img_path.suffix + ".tmp")
|
||||
shutil.copy2(caesium_out, tmp_target)
|
||||
tmp_target.replace(img_path)
|
||||
chosen_size = s
|
||||
except Exception:
|
||||
chosen_size = orig_size
|
||||
|
||||
return ImageProcessResult(
|
||||
image_name=img_path.name,
|
||||
orig_size=orig_size,
|
||||
chosen_size=chosen_size,
|
||||
slide_nr=slide_nr,
|
||||
)
|
||||
|
||||
|
||||
# -------------------- Core per-deck processing --------------------
|
||||
def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quality: int, min_savings: str) -> dict:
|
||||
def process_single_deck(
|
||||
input_pptx: Path,
|
||||
output_pptx: Path,
|
||||
threads: int,
|
||||
quality: int,
|
||||
min_savings: str,
|
||||
compressor: Callable[[Path, Path, int | None, int, str], Path | None] = compress_with_caesium,
|
||||
) -> DeckResult:
|
||||
start_time = time.perf_counter()
|
||||
result = {
|
||||
"input": str(input_pptx),
|
||||
"output": str(output_pptx),
|
||||
"ok": False,
|
||||
"size_before": 0,
|
||||
"size_after": 0,
|
||||
"elapsed_sec": 0.0,
|
||||
"error": None,
|
||||
"log_file": None,
|
||||
}
|
||||
result = DeckResult(
|
||||
input=str(input_pptx),
|
||||
output=str(output_pptx),
|
||||
)
|
||||
work_dir: Optional[Path] = None
|
||||
scratch_dir: Optional[Path] = None
|
||||
|
||||
try:
|
||||
if not input_pptx.exists() or input_pptx.suffix.lower() != ".pptx":
|
||||
@@ -182,13 +255,12 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
|
||||
|
||||
work_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "work_"))
|
||||
scratch_dir = Path(tempfile.mkdtemp(prefix=TEMP_PREFIX + "scratch_"))
|
||||
|
||||
log_file = output_pptx.with_suffix(".log.csv")
|
||||
ensure_clean_file(log_file)
|
||||
log_lines = ["image_name;size_before(kb);size_after(kb);saving(kb);saving_percent(%);in_slide_number\n"]
|
||||
|
||||
size_before = input_pptx.stat().st_size
|
||||
result["size_before"] = size_before
|
||||
result.size_before = size_before
|
||||
|
||||
with zipfile.ZipFile(input_pptx, "r") as z:
|
||||
z.extractall(work_dir)
|
||||
@@ -197,63 +269,46 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
|
||||
rels_dir = slides_dir / "_rels"
|
||||
media_dir = work_dir / "ppt" / "media"
|
||||
|
||||
images = []
|
||||
|
||||
if media_dir.exists():
|
||||
for f in sorted(media_dir.iterdir()):
|
||||
if f.is_file() and f.suffix.lower() in ALLOWED_EXT:
|
||||
images.append(f)
|
||||
images = discover_images(media_dir)
|
||||
|
||||
total = len(images)
|
||||
print(f"[Processing] {input_pptx.name}: {total} Bild(er) gefunden")
|
||||
print_progress(0, total)
|
||||
|
||||
if not which("caesiumclt"):
|
||||
if not which("caesiumclt") and compressor is compress_with_caesium:
|
||||
raise RuntimeError("'caesiumclt' nicht gefunden. Bitte installieren und in PATH verfügbar machen.")
|
||||
|
||||
caesium_threads = 1 if threads and threads > 1 else None
|
||||
caesium_threads = 1 if threads > 1 else None
|
||||
lock = Lock()
|
||||
done_count = 0
|
||||
image_to_slides = build_image_slide_index(rels_dir)
|
||||
|
||||
def worker(idx: int, img_path: Path):
|
||||
nonlocal done_count
|
||||
orig_size = img_path.stat().st_size
|
||||
chosen_size = orig_size
|
||||
found_in_slide = None
|
||||
slide_nr = ""
|
||||
image_result = process_image_file(
|
||||
idx=idx,
|
||||
img_path=img_path,
|
||||
scratch_dir=scratch_dir,
|
||||
image_to_slides=image_to_slides,
|
||||
caesium_threads=caesium_threads,
|
||||
quality=quality,
|
||||
min_savings=min_savings,
|
||||
compressor=compressor,
|
||||
)
|
||||
|
||||
try:
|
||||
found_in_slide = image_to_slides.get(img_path.name)
|
||||
if found_in_slide is None:
|
||||
slide_nr = "NOT_USED"
|
||||
else:
|
||||
slide_nr = str(found_in_slide)
|
||||
out_sub = scratch_dir / f"img_{idx:06d}"
|
||||
caesium_out = compress_with_caesium(img_path, out_sub, caesium_threads, quality, min_savings)
|
||||
if caesium_out and caesium_out.exists():
|
||||
s = caesium_out.stat().st_size
|
||||
if s < orig_size:
|
||||
tmp_target = img_path.with_suffix(img_path.suffix + ".tmp")
|
||||
shutil.copy2(caesium_out, tmp_target)
|
||||
tmp_target.replace(img_path)
|
||||
chosen_size = s
|
||||
except Exception:
|
||||
chosen_size = orig_size
|
||||
finally:
|
||||
saving = orig_size - chosen_size
|
||||
saving_percent = round((saving / orig_size) * 100, 2) if orig_size > 0 else 0.0
|
||||
|
||||
with lock:
|
||||
log_lines.append(f"{img_path.name};{human_kb(orig_size)};{human_kb(chosen_size)};{human_kb(saving)};{saving_percent};{slide_nr}\n")
|
||||
done_count += 1
|
||||
print_progress(done_count, total)
|
||||
with lock:
|
||||
log_lines.append(image_result_to_log_line(image_result))
|
||||
done_count += 1
|
||||
print_progress(done_count, total)
|
||||
|
||||
if total > 0:
|
||||
with ThreadPoolExecutor(max_workers=max(1, threads)) as ex:
|
||||
futures = [ex.submit(worker, i, p) for i, p in enumerate(images, start=1)]
|
||||
for _ in as_completed(futures):
|
||||
pass
|
||||
for fut in as_completed(futures):
|
||||
try:
|
||||
fut.result()
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"[worker] Unerwarteter Fehler: {exc}\n")
|
||||
|
||||
print() # newline
|
||||
|
||||
@@ -275,7 +330,7 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
|
||||
|
||||
zip_dir_to_pptx(work_dir, output_pptx)
|
||||
size_after = output_pptx.stat().st_size
|
||||
result["size_after"] = size_after
|
||||
result.size_after = size_after
|
||||
|
||||
try:
|
||||
with open(log_file, "w", encoding="utf-8") as f:
|
||||
@@ -284,9 +339,9 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
|
||||
pass
|
||||
|
||||
elapsed = time.perf_counter() - start_time
|
||||
result["elapsed_sec"] = elapsed
|
||||
result["log_file"] = str(log_file)
|
||||
result["ok"] = True
|
||||
result.elapsed_sec = elapsed
|
||||
result.log_file = str(log_file)
|
||||
result.ok = True
|
||||
|
||||
savings_pct = 0.0 if size_before == 0 else round(100.0 * (size_before - size_after) / size_before, 2)
|
||||
print(f"[OK] Fertig! ({input_pptx.name})")
|
||||
@@ -298,16 +353,12 @@ def process_single_deck(input_pptx: Path, output_pptx: Path, threads: int, quali
|
||||
print(" Log: ", log_file)
|
||||
|
||||
except Exception as e:
|
||||
result["error"] = str(e)
|
||||
result.error = str(e)
|
||||
finally:
|
||||
try:
|
||||
shutil.rmtree(work_dir, ignore_errors=True) # type: ignore[name-defined]
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
shutil.rmtree(scratch_dir, ignore_errors=True) # type: ignore[name-defined]
|
||||
except Exception:
|
||||
pass
|
||||
if work_dir is not None:
|
||||
shutil.rmtree(work_dir, ignore_errors=True)
|
||||
if scratch_dir is not None:
|
||||
shutil.rmtree(scratch_dir, ignore_errors=True)
|
||||
cleanup_old_temps()
|
||||
|
||||
return result
|
||||
@@ -421,13 +472,13 @@ def main():
|
||||
continue
|
||||
dst = out_dir / f"{src.stem}_compressed.pptx"
|
||||
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
|
||||
if res['ok']:
|
||||
if res.ok:
|
||||
successes += 1
|
||||
overall_before += res['size_before']
|
||||
overall_after += res['size_after']
|
||||
overall_before += res.size_before
|
||||
overall_after += res.size_after
|
||||
else:
|
||||
failures += 1
|
||||
print(f" Fehler: {src.name} -> {res['error']}")
|
||||
print(f" Fehler: {src.name} -> {res.error}")
|
||||
else:
|
||||
src = input_files[0]
|
||||
if args.output_dir:
|
||||
@@ -436,13 +487,13 @@ def main():
|
||||
else:
|
||||
dst = Path(args.output).resolve() if args.output else src.with_name(f"{src.stem}_compressed.pptx")
|
||||
res = process_single_deck(src, dst, args.threads, args.quality, args.min_savings)
|
||||
if res['ok']:
|
||||
if res.ok:
|
||||
successes += 1
|
||||
overall_before += res['size_before']
|
||||
overall_after += res['size_after']
|
||||
overall_before += res.size_before
|
||||
overall_after += res.size_after
|
||||
else:
|
||||
failures += 1
|
||||
print(f" Fehler: {src.name} -> {res['error']}")
|
||||
print(f" Fehler: {src.name} -> {res.error}")
|
||||
|
||||
if batch_mode:
|
||||
|
||||
|
||||
144
test_pptx_image_compress.py
Normal file
144
test_pptx_image_compress.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import tempfile
|
||||
import unittest
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
|
||||
import pptx_image_compress as pic
|
||||
|
||||
|
||||
class TestPptxImageCompress(unittest.TestCase):
|
||||
def test_discover_images_filters_extensions(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
media_dir = Path(td)
|
||||
(media_dir / "a.jpg").write_bytes(b"1")
|
||||
(media_dir / "b.png").write_bytes(b"1")
|
||||
(media_dir / "c.txt").write_bytes(b"1")
|
||||
(media_dir / "d.GIF").write_bytes(b"1")
|
||||
images = pic.discover_images(media_dir)
|
||||
self.assertEqual([p.name for p in images], ["a.jpg", "b.png", "d.GIF"])
|
||||
|
||||
def test_image_result_to_log_line(self):
|
||||
image_result = pic.ImageProcessResult(
|
||||
image_name="image1.png",
|
||||
orig_size=1000,
|
||||
chosen_size=800,
|
||||
slide_nr="[1, 2]",
|
||||
)
|
||||
line = pic.image_result_to_log_line(image_result)
|
||||
self.assertIn("image1.png", line)
|
||||
self.assertIn("[1, 2]", line)
|
||||
self.assertIn("20.0", line)
|
||||
|
||||
def test_process_image_file_replaces_when_smaller(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
img = root / "image1.png"
|
||||
img.write_bytes(b"A" * 100)
|
||||
scratch = root / "scratch"
|
||||
|
||||
def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str):
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
out = out_dir / original.name
|
||||
out.write_bytes(b"B" * 40)
|
||||
return out
|
||||
|
||||
result = pic.process_image_file(
|
||||
idx=1,
|
||||
img_path=img,
|
||||
scratch_dir=scratch,
|
||||
image_to_slides={"image1.png": [1]},
|
||||
caesium_threads=1,
|
||||
quality=90,
|
||||
min_savings="2%",
|
||||
compressor=fake_compressor,
|
||||
)
|
||||
|
||||
self.assertEqual(result.chosen_size, 40)
|
||||
self.assertEqual(img.stat().st_size, 40)
|
||||
self.assertEqual(result.slide_nr, "[1]")
|
||||
|
||||
def test_process_image_file_keeps_original_when_bigger(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
img = root / "image1.png"
|
||||
img.write_bytes(b"A" * 100)
|
||||
scratch = root / "scratch"
|
||||
|
||||
def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str):
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
out = out_dir / original.name
|
||||
out.write_bytes(b"B" * 120)
|
||||
return out
|
||||
|
||||
result = pic.process_image_file(
|
||||
idx=1,
|
||||
img_path=img,
|
||||
scratch_dir=scratch,
|
||||
image_to_slides={},
|
||||
caesium_threads=1,
|
||||
quality=90,
|
||||
min_savings="2%",
|
||||
compressor=fake_compressor,
|
||||
)
|
||||
|
||||
self.assertEqual(result.chosen_size, 100)
|
||||
self.assertEqual(img.stat().st_size, 100)
|
||||
self.assertEqual(result.slide_nr, "NOT_USED")
|
||||
|
||||
def test_process_single_deck_with_injected_compressor(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
input_pptx = root / "input.pptx"
|
||||
output_pptx = root / "output.pptx"
|
||||
source_tree = root / "src"
|
||||
rels_dir = source_tree / "ppt" / "slides" / "_rels"
|
||||
media_dir = source_tree / "ppt" / "media"
|
||||
rels_dir.mkdir(parents=True, exist_ok=True)
|
||||
media_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
rels_xml = (
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>"
|
||||
"<Relationships xmlns=\"http://schemas.openxmlformats.org/package/2006/relationships\">"
|
||||
"<Relationship Id=\"rId2\" Type=\"http://schemas.openxmlformats.org/officeDocument/2006/relationships/image\" Target=\"../media/image1.png\"/>"
|
||||
"</Relationships>"
|
||||
)
|
||||
(rels_dir / "slide1.xml.rels").write_text(rels_xml, encoding="utf-8")
|
||||
(media_dir / "image1.png").write_bytes(b"A" * 100)
|
||||
|
||||
with zipfile.ZipFile(input_pptx, "w", compression=zipfile.ZIP_DEFLATED) as z:
|
||||
for p in source_tree.rglob("*"):
|
||||
if p.is_file():
|
||||
z.write(p, arcname=str(p.relative_to(source_tree)))
|
||||
|
||||
def fake_compressor(original: Path, out_dir: Path, caesium_threads: int | None, quality: int, min_savings: str):
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
out = out_dir / original.name
|
||||
out.write_bytes(b"B" * 50)
|
||||
return out
|
||||
|
||||
result = pic.process_single_deck(
|
||||
input_pptx=input_pptx,
|
||||
output_pptx=output_pptx,
|
||||
threads=2,
|
||||
quality=90,
|
||||
min_savings="2%",
|
||||
compressor=fake_compressor,
|
||||
)
|
||||
|
||||
self.assertTrue(result.ok)
|
||||
self.assertEqual(result.error, None)
|
||||
self.assertTrue(output_pptx.exists())
|
||||
self.assertIsNotNone(result.log_file)
|
||||
with zipfile.ZipFile(output_pptx, "r") as z:
|
||||
out_image = z.read("ppt/media/image1.png")
|
||||
self.assertEqual(len(out_image), 50)
|
||||
log_file = result.log_file
|
||||
if log_file is None:
|
||||
self.fail("log_file should not be None")
|
||||
log_text = Path(log_file).read_text(encoding="utf-8")
|
||||
self.assertIn("image1.png", log_text)
|
||||
self.assertIn("[1]", log_text)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user