from __future__ import annotations import asyncio import csv from pathlib import Path from typing import Any, Dict, List from playwright.async_api import async_playwright, Page, Response from .auth import AUTH_STATE_FILE from .parser import ( parse_person_from_organization_person, parse_direct_emails_from_organization, parse_workingwith_entries, ) from .models import Person, WorkingWithRelation DELV_PERSON_URL = "https://eur.loki.delve.office.com/api/v2/person" DELV_ORG_URL = "https://eur.loki.delve.office.com/api/v1/organization" DELV_WORKINGWITH_URL = "https://eur.loki.delve.office.com/api/v1/workingwith" async def _collect_json_from_responses(page: Page) -> Dict[str, Any]: collected: Dict[str, Any] = {} async def handle_response(response: Response): url = response.url try: if url.startswith(DELV_PERSON_URL): collected["person"] = await response.json() elif url.startswith(DELV_ORG_URL): collected["organization"] = await response.json() elif url.startswith(DELV_WORKINGWITH_URL): collected["workingwith"] = await response.json() except Exception: # JSON-Parsing-Fehler ignorieren pass page.on("response", handle_response) return collected async def _open_profile_and_collect(page: Page, email: str) -> Dict[str, Any]: collected = await _collect_json_from_responses(page) await page.goto("https://m365.cloud.microsoft/search/?auth=2") # Suche öffnen search_button = page.get_by_role("button", name="search") await search_button.click() input_box = page.locator('input[type="text"]') await input_box.fill(f'Person:"{email}"') await input_box.press("Enter") # Warten bis ein Profil-Button erscheint await page.wait_for_timeout(2000) profile_button = page.get_by_title("Organisation") 'await profile_button.click()' # Organisation-Tab org_button = page.locator('button[data-content="Organisation"]') await org_button.click() # Zeit geben, Netzwerk-Calls zu sammeln await page.wait_for_timeout(4000) return collected async def extract_relations_for_manager(manager_email: str) -> List[WorkingWithRelation]: if not AUTH_STATE_FILE.exists(): raise RuntimeError("auth_state.json nicht vorhanden, bitte zuerst login-Modus ausführen.") async with async_playwright() as p: browser_type = p.chromium context = await browser_type.launch_persistent_context( user_data_dir="user_data", headless=False, channel="chrome" ) page = await context.new_page() collected = await _open_profile_and_collect(page, manager_email) person_json = collected.get("person") or {} org_json = collected.get("organization") or {} working_json = collected.get("workingwith") or {} manager_person = parse_person_from_organization_person(person_json) working_with_persons = parse_workingwith_entries(working_json) directs_emails = parse_direct_emails_from_organization(org_json) relations: List[WorkingWithRelation] = [] # Manager -> WorkingWith for dest in working_with_persons: relations.append(WorkingWithRelation(source=manager_person, destination=dest)) print("Found directs: ", len(directs_emails)) # Für alle Directs ebenfalls WorkingWith holen for direct_email in directs_emails: collected_direct = await _open_profile_and_collect(page, direct_email) person_json_d = collected_direct.get("person") or {} working_json_d = collected_direct.get("workingwith") or {} direct_person = parse_person_from_organization_person(person_json_d) working_with_persons_d = parse_workingwith_entries(working_json_d) for dest in working_with_persons_d: relations.append(WorkingWithRelation(source=direct_person, destination=dest)) await context.close() return relations async def extract_relations_for_emails(emails: List[str]) -> List[WorkingWithRelation]: if not AUTH_STATE_FILE.exists(): raise RuntimeError("auth_state.json nicht vorhanden – bitte zuerst login-Modus ausführen.") async with async_playwright() as p: browser_type = p.chromium context = await browser_type.launch_persistent_context( user_data_dir="user_data", headless=False, channel="chrome" ) page = await context.new_page() relations: List[WorkingWithRelation] = [] for email in emails: collected = await _open_profile_and_collect(page, email) person_json = collected.get("person") or {} working_json = collected.get("workingwith") or {} person = parse_person_from_organization_person(person_json) working_with_persons = parse_workingwith_entries(working_json) for dest in working_with_persons: relations.append(WorkingWithRelation(source=person, destination=dest)) await context.close() return relations def write_relations_to_csv(relations: List[WorkingWithRelation], output_path: Path) -> None: fieldnames = [ "source_mail", "source_displayname", "source_jobTitle", "source_department", "destination_mail", "destination_displayname", "destination_jobTitle", "destination_department", ] with output_path.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for rel in relations: writer.writerow( { "source_mail": rel.source.email, "source_displayname": rel.source.display_name, "source_jobTitle": rel.source.job_title, "source_department": rel.source.department, "destination_mail": rel.destination.email, "destination_displayname": rel.destination.display_name, "destination_jobTitle": rel.destination.job_title, "destination_department": rel.destination.department, } )