Files
m365-workingwith/app/extractor.py

173 lines
6.2 KiB
Python

from __future__ import annotations
import asyncio
import csv
from pathlib import Path
from typing import Any, Dict, List
from playwright.async_api import async_playwright, Page, Response
from .auth import AUTH_STATE_FILE
from .parser import (
parse_person_from_organization_person,
parse_direct_emails_from_organization,
parse_workingwith_entries,
)
from .models import Person, WorkingWithRelation
DELV_PERSON_URL = "https://eur.loki.delve.office.com/api/v2/person"
DELV_ORG_URL = "https://eur.loki.delve.office.com/api/v1/organization"
DELV_WORKINGWITH_URL = "https://eur.loki.delve.office.com/api/v1/workingwith"
async def _collect_json_from_responses(page: Page) -> Dict[str, Any]:
collected: Dict[str, Any] = {}
async def handle_response(response: Response):
url = response.url
try:
if url.startswith(DELV_PERSON_URL):
collected["person"] = await response.json()
elif url.startswith(DELV_ORG_URL):
collected["organization"] = await response.json()
elif url.startswith(DELV_WORKINGWITH_URL):
collected["workingwith"] = await response.json()
except Exception:
# JSON-Parsing-Fehler ignorieren
pass
page.on("response", handle_response)
return collected
async def _open_profile_and_collect(page: Page, email: str) -> Dict[str, Any]:
collected = await _collect_json_from_responses(page)
await page.goto("https://m365.cloud.microsoft/search/?auth=2")
# Suche öffnen
search_button = page.get_by_role("button", name="search")
await search_button.click()
input_box = page.locator('input[type="text"]')
await input_box.fill(f'Person:"{email}"')
await input_box.press("Enter")
# Warten bis ein Profil-Button erscheint
await page.wait_for_timeout(2000)
profile_button = page.get_by_title("Organisation")
'await profile_button.click()'
# Organisation-Tab
org_button = page.locator('button[data-content="Organisation"]')
await org_button.click()
# Zeit geben, Netzwerk-Calls zu sammeln
await page.wait_for_timeout(4000)
return collected
async def extract_relations_for_manager(manager_email: str) -> List[WorkingWithRelation]:
if not AUTH_STATE_FILE.exists():
raise RuntimeError("auth_state.json nicht vorhanden, bitte zuerst login-Modus ausführen.")
async with async_playwright() as p:
browser_type = p.chromium
context = await browser_type.launch_persistent_context(
user_data_dir="user_data",
headless=False,
channel="chrome"
)
page = await context.new_page()
collected = await _open_profile_and_collect(page, manager_email)
person_json = collected.get("person") or {}
org_json = collected.get("organization") or {}
working_json = collected.get("workingwith") or {}
manager_person = parse_person_from_organization_person(person_json)
working_with_persons = parse_workingwith_entries(working_json)
directs_emails = parse_direct_emails_from_organization(org_json)
relations: List[WorkingWithRelation] = []
# Manager -> WorkingWith
for dest in working_with_persons:
relations.append(WorkingWithRelation(source=manager_person, destination=dest))
print("Found directs: ", len(directs_emails))
# Für alle Directs ebenfalls WorkingWith holen
for direct_email in directs_emails:
collected_direct = await _open_profile_and_collect(page, direct_email)
person_json_d = collected_direct.get("person") or {}
working_json_d = collected_direct.get("workingwith") or {}
direct_person = parse_person_from_organization_person(person_json_d)
working_with_persons_d = parse_workingwith_entries(working_json_d)
for dest in working_with_persons_d:
relations.append(WorkingWithRelation(source=direct_person, destination=dest))
await context.close()
return relations
async def extract_relations_for_emails(emails: List[str]) -> List[WorkingWithRelation]:
if not AUTH_STATE_FILE.exists():
raise RuntimeError("auth_state.json nicht vorhanden – bitte zuerst login-Modus ausführen.")
async with async_playwright() as p:
browser_type = p.chromium
context = await browser_type.launch_persistent_context(
user_data_dir="user_data",
headless=False,
channel="chrome"
)
page = await context.new_page()
relations: List[WorkingWithRelation] = []
for email in emails:
collected = await _open_profile_and_collect(page, email)
person_json = collected.get("person") or {}
working_json = collected.get("workingwith") or {}
person = parse_person_from_organization_person(person_json)
working_with_persons = parse_workingwith_entries(working_json)
for dest in working_with_persons:
relations.append(WorkingWithRelation(source=person, destination=dest))
await context.close()
return relations
def write_relations_to_csv(relations: List[WorkingWithRelation], output_path: Path) -> None:
fieldnames = [
"source_mail",
"source_displayname",
"source_jobTitle",
"source_department",
"destination_mail",
"destination_displayname",
"destination_jobTitle",
"destination_department",
]
with output_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for rel in relations:
writer.writerow(
{
"source_mail": rel.source.email,
"source_displayname": rel.source.display_name,
"source_jobTitle": rel.source.job_title,
"source_department": rel.source.department,
"destination_mail": rel.destination.email,
"destination_displayname": rel.destination.display_name,
"destination_jobTitle": rel.destination.job_title,
"destination_department": rel.destination.department,
}
)