Erster Checkin: Tool arbeitet

This commit is contained in:
2026-02-02 12:40:51 +01:00
parent 69fcca1fd0
commit 12926c7e83
12 changed files with 616 additions and 0 deletions

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Package-Initialisierung für m365-workingwith

28
app/aggregator.py Normal file
View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
from typing import List
from .models import WorkingWithRelation
@dataclass
class DepartmentLink:
source_department: str
destination_department: str
weight: int
def aggregate_department_links(relations: List[WorkingWithRelation]) -> List[DepartmentLink]:
counter: Counter[tuple[str, str]] = Counter()
for rel in relations:
src = (rel.source.department or "").strip()
dst = (rel.destination.department or "").strip()
if not src or not dst:
continue
counter[(src, dst)] += 1
links: List[DepartmentLink] = [
DepartmentLink(source_department=s, destination_department=d, weight=w)
for (s, d), w in counter.items()
]
return sorted(links, key=lambda l: l.weight, reverse=True)

51
app/auth.py Normal file
View File

@@ -0,0 +1,51 @@
import asyncio
import json
from pathlib import Path
from playwright.async_api import async_playwright
AUTH_STATE_FILE = Path("auth_state.json")
async def ensure_login(email: str) -> None:
"""
Öffnet den Browser mit/ohne bestehenden auth_state.
Beim ersten Mal: manueller Login in M365.
Danach wird der Auth-State persistiert.
"""
async with async_playwright() as p:
browser_type = p.chromium
if AUTH_STATE_FILE.exists():
context = await browser_type.launch_persistent_context(
user_data_dir="user_data",
headless=False,
channel="chrome",
)
else:
context = await browser_type.launch_persistent_context(
user_data_dir="user_data",
headless=False,
channel="chrome",
)
page = await context.new_page()
await page.goto("https://m365.cloud.microsoft/search/?auth=2")
print(f"Bitte mit {email} in Microsoft 365 anmelden.")
print("Nach erfolgreichem Login Browser-Fenster schliessen oder Skript abbrechen (Ctrl+C).")
# Langes Warten; in echter Implementierung könnte man Events verwenden
await page.wait_for_load_state("domcontentloaded")
'page.wait_for_timeout(1000 * 1000)'
state = await context.storage_state()
# storage_state() liefert dict -> JSON speichern
AUTH_STATE_FILE.write_text(json.dumps(state), encoding="utf-8")
await context.close()
def ensure_login_sync(email: str) -> None:
asyncio.run(ensure_login(email))

101
app/easyvisualize.py Normal file
View File

@@ -0,0 +1,101 @@
import pandas as pd
import plotly.graph_objects as go
import sys
import os
# --- KONFIGURATION & DATEIPFAD ---
# Prüfen, ob ein Pfad als Argument übergeben wurde, sonst Default nutzen
if len(sys.argv) > 1:
csv_dateipfad = sys.argv[1]
else:
csv_dateipfad = '/output/relations.csv'
# Sicherstellen, dass die Datei existiert
if not os.path.exists(csv_dateipfad):
print(f"Fehler: Die Datei '{csv_dateipfad}'wurde nicht gefunden.")
sys.exit(1)
# --- DATEN LADEN ---
try:
# Wir laden die CSV (Trennzeichen ; ist im deutschen Excel-Raum Standard)
df =pd.read_csv(csv_dateipfad, sep=None, engine='python')
print(f"Daten erfolgreich geladen: {len(df)} Zeile aus '{csv_dateipfad}'.")
except Exception as e:
print(f"Fehler beim Lesen der CSV: {e}")
sys.exit(1)
# --- DATEN AGGREGIEREN ---
# Wir zählen die Kommunikationspfade zwischen den Abteilungen
''' df_agg = df.groupby(['source_department', 'destination_department']).size().reset_index(name='weight') '''
df_agg = df.groupby(['source_department', 'destination_displayname']).size().reset_index(name='weight')
# Liste aller Departments für die Knoten-Beschriftung
''' all_nodes = list(pd.concat([df_agg['source_department'], df_agg['destination_department']]).unique()) '''
all_nodes = list(pd.concat([df_agg['source_department'], df_agg['destination_displayname']]).unique())
node_map = {name: i for i, name in enumerate(all_nodes)}
# Mapping auf Indizes
source_indices = df_agg['source_department'].map(node_map)
target_indices = df_agg['destination_displayname'].map(node_map)
''' target_indices = df_agg['destination_department'].map(node_map) '''
weights = df_agg['weight']
# --- FARBGESTALTUNG ---
# Wir färben die "Source"-Abteilungen (dein Team) anders ein als die "Ziele"
node_colors = ["#1f77b4"
if node in df['source_department'].unique() else "#9467bd" for node in all_nodes]
# --- VISUALISIERUNG ---
fig = go.Figure(data=[go.Sankey(
node=dict(
pad=20,
thickness=30,
line=dict(color="black", width=0.5),
label=all_nodes,
color=node_colors
),
link=dict(
source=source_indices,
target=target_indices,
value=weights,
color="rgba(200, 200, 200, 0.5)"
# Transparente graue Pfade
)
)])
fig.update_layout(
title_text=f"Organisatorische Netzwerkanalyse: Abteilungs-Flüsse<br><sup>Quelle: {csv_dateipfad}</sup>",
font_size=12,
height=800
)
# --- AUSGABE ---
output_filename = "netzwerk_analyse_lokal.html"
fig.write_html(output_filename)
print(f"Analyse abgeschlossen. Interaktives Diagramm gespeichert unter: {output_filename}")
# Automatisches Öffnen im Standardbrowser
import webbrowser
webbrowser.open('file://'+ os.path.realpath(output_filename))

174
app/extractor.py Normal file
View File

@@ -0,0 +1,174 @@
from __future__ import annotations
import asyncio
import csv
from pathlib import Path
from typing import Any, Dict, List
from playwright.async_api import async_playwright, Page, Response
from .auth import AUTH_STATE_FILE
from .parser import (
parse_person_from_organization_person,
parse_direct_emails_from_organization,
parse_workingwith_entries,
)
from .models import Person, WorkingWithRelation
DELV_PERSON_URL = "https://eur.loki.delve.office.com/api/v2/person"
DELV_ORG_URL = "https://eur.loki.delve.office.com/api/v1/organization"
DELV_WORKINGWITH_URL = "https://eur.loki.delve.office.com/api/v1/workingwith"
async def _collect_json_from_responses(page: Page) -> Dict[str, Any]:
collected: Dict[str, Any] = {}
async def handle_response(response: Response):
url = response.url
try:
if url.startswith(DELV_PERSON_URL):
collected["person"] = await response.json()
elif url.startswith(DELV_ORG_URL):
collected["organization"] = await response.json()
elif url.startswith(DELV_WORKINGWITH_URL):
collected["workingwith"] = await response.json()
except Exception:
# JSON-Parsing-Fehler ignorieren
pass
page.on("response", handle_response)
return collected
async def _open_profile_and_collect(page: Page, email: str) -> Dict[str, Any]:
collected = await _collect_json_from_responses(page)
await page.goto("https://m365.cloud.microsoft/search/?auth=2")
# Suche öffnen
search_button = page.get_by_role("button", name="search")
await search_button.click()
input_box = page.locator('input[type="text"]')
await input_box.fill(f'Person:"{email}"')
await input_box.press("Enter")
# Warten bis ein Profil-Button erscheint
await page.wait_for_timeout(2000)
profile_button = page.get_by_title("Organisation")
'await profile_button.click()'
# Organisation-Tab
org_button = page.locator('button[data-content="Organisation"]')
await org_button.click()
# Zeit geben, Netzwerk-Calls zu sammeln
await page.wait_for_timeout(4000)
return collected
async def extract_relations_for_manager(manager_email: str) -> List[WorkingWithRelation]:
if not AUTH_STATE_FILE.exists():
raise RuntimeError("auth_state.json nicht vorhanden, bitte zuerst login-Modus ausführen.")
async with async_playwright() as p:
browser_type = p.chromium
context = await browser_type.launch_persistent_context(
user_data_dir="user_data",
headless=False,
channel="chrome"
)
page = await context.new_page()
collected = await _open_profile_and_collect(page, manager_email)
person_json = collected.get("person") or {}
org_json = collected.get("organization") or {}
working_json = collected.get("workingwith") or {}
manager_person = parse_person_from_organization_person(person_json)
print(manager_person)
working_with_persons = parse_workingwith_entries(working_json)
print(working_with_persons)
directs_emails = parse_direct_emails_from_organization(org_json)
print(directs_emails)
relations: List[WorkingWithRelation] = []
# Manager -> WorkingWith
for dest in working_with_persons:
relations.append(WorkingWithRelation(source=manager_person, destination=dest))
# Für alle Directs ebenfalls WorkingWith holen
for direct_email in directs_emails:
collected_direct = await _open_profile_and_collect(page, direct_email)
person_json_d = collected_direct.get("person") or {}
working_json_d = collected_direct.get("workingwith") or {}
direct_person = parse_person_from_organization_person(person_json_d)
working_with_persons_d = parse_workingwith_entries(working_json_d)
for dest in working_with_persons_d:
relations.append(WorkingWithRelation(source=direct_person, destination=dest))
await context.close()
return relations
async def extract_relations_for_emails(emails: List[str]) -> List[WorkingWithRelation]:
if not AUTH_STATE_FILE.exists():
raise RuntimeError("auth_state.json nicht vorhanden – bitte zuerst login-Modus ausführen.")
async with async_playwright() as p:
browser_type = p.chromium
context = await browser_type.launch_persistent_context(
user_data_dir="user_data",
headless=False,
channel="chrome"
)
page = await context.new_page()
relations: List[WorkingWithRelation] = []
for email in emails:
collected = await _open_profile_and_collect(page, email)
person_json = collected.get("person") or {}
working_json = collected.get("workingwith") or {}
person = parse_person_from_organization_person(person_json)
working_with_persons = parse_workingwith_entries(working_json)
for dest in working_with_persons:
relations.append(WorkingWithRelation(source=person, destination=dest))
await context.close()
return relations
def write_relations_to_csv(relations: List[WorkingWithRelation], output_path: Path) -> None:
fieldnames = [
"source_mail",
"source_displayname",
"source_jobTitle",
"source_department",
"destination_mail",
"destination_displayname",
"destination_jobTitle",
"destination_department",
]
with output_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for rel in relations:
writer.writerow(
{
"source_mail": rel.source.email,
"source_displayname": rel.source.display_name,
"source_jobTitle": rel.source.job_title,
"source_department": rel.source.department,
"destination_mail": rel.destination.email,
"destination_displayname": rel.destination.display_name,
"destination_jobTitle": rel.destination.job_title,
"destination_department": rel.destination.department,
}
)

60
app/main.py Normal file
View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
from .auth import ensure_login_sync
from .extractor import (
extract_relations_for_manager,
extract_relations_for_emails,
write_relations_to_csv,
)
from .aggregator import aggregate_department_links
OUTPUT_DIR = Path("output")
OUTPUT_DIR.mkdir(exist_ok=True)
def main(argv: list[str]) -> None:
if len(argv) < 2:
print("Verwendung:")
print(" python -m app.main login <email>")
print(" python -m app.main manager <manager_email>")
print(" python -m app.main emails <email1> [<email2> ...]")
sys.exit(1)
mode = argv[1].lower()
if mode == "login":
if len(argv) != 3:
print("login-Modus benötigt genau 1 E-Mail-Adresse.")
sys.exit(1)
email = argv[2]
ensure_login_sync(email)
return
elif mode == "manager":
if len(argv) != 3:
print("manager-Modus benötigt genau 1 Manager-E-Mail.")
sys.exit(1)
manager_email = argv[2]
relations = asyncio.run(extract_relations_for_manager(manager_email))
elif mode == "emails":
if len(argv) < 3:
print("emails-Modus benötigt mindestens 1 E-Mail-Adresse.")
sys.exit(1)
emails = argv[2:]
relations = asyncio.run(extract_relations_for_emails(emails))
else:
print(f"Unbekannter Modus: {mode}")
sys.exit(1)
csv_path = OUTPUT_DIR / "relations.csv"
write_relations_to_csv(relations, csv_path)
print(f"CSV geschrieben: {csv_path}")
if __name__ == "__main__":
main(sys.argv)

16
app/models.py Normal file
View File

@@ -0,0 +1,16 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class Person:
email: str
display_name: Optional[str]
job_title: Optional[str]
department: Optional[str]
@dataclass
class WorkingWithRelation:
source: Person
destination: Person

131
app/parser.py Normal file
View File

@@ -0,0 +1,131 @@
from __future__ import annotations
from typing import Any, Dict, Optional
from .models import Person
def _first_or_none(items: Optional[list]) -> Optional[dict]:
if not items:
return None
return items[0] or None
def parse_person_from_organization_person(json_obj: Dict[str, Any]) -> Person:
"""
Erwartet ein Objekt der Form:
{
"person": {
"names": [
{
"value": {
"displayName": "Display, Name-MGB",
"givenName": "Name",
"surname": "Display"
},
"source": "Organisation"
}
],
"emailAddresses": [
{
"value": {
"name": "display.name@mgb.ch",
"address": "display.name@mgb.ch"
},
"source": "Organisation"
}
],
"workDetails": [
{
"value": {
"companyName": "Migros-Genossenschafts-Bund",
"jobTitle": "Title",
"department": "Dept",
"office": "HH-xx"
},
"source": "Organisation"
}
]
}
}
"""
person_data = json_obj.get("person", {})
# Namen
names_entry = _first_or_none(person_data.get("names"))
names_value = (names_entry or {}).get("value", {}) if names_entry else {}
display_name = names_value.get("displayName")
# E-Mail
mail_entry = _first_or_none(person_data.get("emailAddresses"))
mail_value = (mail_entry or {}).get("value", {}) if mail_entry else {}
email = mail_value.get("address")
# WorkDetails: Firma, Job, Abteilung, Büro
work_entry = _first_or_none(person_data.get("workDetails"))
work_value = (work_entry or {}).get("value", {}) if work_entry else {}
company_name = work_value.get("companyName")
job_title = work_value.get("jobTitle")
department = work_value.get("department")
office = work_value.get("office")
return Person(
email=email,
display_name=display_name,
job_title=job_title,
department=department,
)
def parse_direct_emails_from_organization(json_obj: dict[str, Any]) -> List[str]:
"""
Erwartete Struktur (aus organization_anonymized):
{
"managers": [...],
"directs": [
{
"smtp": "direct.report@mgb.ch",
"userPrincipalName": "direct.report@mgb.ch",
...
}
]
}
"""
directs = json_obj.get("directs", [])
emails: List[str] = []
for item in directs:
email = item.get("smtp") or item.get("userPrincipalName")
if email:
emails.append(email)
return emails
def parse_workingwith_entries(json_obj: dict[str, Any]) -> List[Person]:
"""
Erwartete Struktur (aus working_with_anonymized):
{
"value": [
{
"email": "user@migros.ch",
"userPrincipalName": "user@migros.ch",
"fullName": "Nachname, Vorname-MIGROS",
"jobTitle": "Leitung",
"department": "Dept 1",
...
}
]
}
"""
value = json_obj.get("value", [])
persons: List[Person] = []
for item in value:
email = item.get("email") or item.get("userPrincipalName")
persons.append(
Person(
email=email,
display_name=item.get("fullName"),
job_title=item.get("jobTitle"),
department=item.get("department"),
)
)
return persons

18
install.bat Normal file
View File

@@ -0,0 +1,18 @@
@echo off
setlocal
REM Virtuelle Umgebung erstellen
python -m venv venv
REM venv aktivieren
call venv\Scripts\activate.bat
REM Abhängigkeiten installieren
pip install --upgrade pip
pip install -r requirements.txt
REM Playwright-Browser installieren
playwright install chromium
echo Installation abgeschlossen.
endlocal

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
playwright
pyvis
plotly
pandas

17
run.bat Normal file
View File

@@ -0,0 +1,17 @@
@echo off
setlocal
REM Virtuelle Umgebung aktivieren
call venv\Scripts\activate.bat
if "%1"=="" (
echo Nutzung:
echo run.bat login ^<email^>
echo run.bat manager ^<manager_email^>
echo run.bat emails ^<email1^> [^<email2^> ...]
goto :eof
)
python -m app.main %*
endlocal

15
visualize.bat Normal file
View File

@@ -0,0 +1,15 @@
@echo off
setlocal
REM Virtuelle Umgebung aktivieren
call venv\Scripts\activate.bat
if "%1"=="" (
echo Nutzung:
echo visualize.bat ^<path_to_csv^>
goto :eof
)
python -m app.easyvisualize.py %*
endlocal