132 lines
3.5 KiB
Python
132 lines
3.5 KiB
Python
from __future__ import annotations
|
|
from typing import Any, Dict, Optional
|
|
from .models import Person
|
|
|
|
def _first_or_none(items: Optional[list]) -> Optional[dict]:
|
|
if not items:
|
|
return None
|
|
return items[0] or None
|
|
|
|
def parse_person_from_organization_person(json_obj: Dict[str, Any]) -> Person:
|
|
"""
|
|
Erwartet ein Objekt der Form:
|
|
|
|
{
|
|
"person": {
|
|
"names": [
|
|
{
|
|
"value": {
|
|
"displayName": "Display, Name-MGB",
|
|
"givenName": "Name",
|
|
"surname": "Display"
|
|
},
|
|
"source": "Organisation"
|
|
}
|
|
],
|
|
"emailAddresses": [
|
|
{
|
|
"value": {
|
|
"name": "display.name@mgb.ch",
|
|
"address": "display.name@mgb.ch"
|
|
},
|
|
"source": "Organisation"
|
|
}
|
|
],
|
|
"workDetails": [
|
|
{
|
|
"value": {
|
|
"companyName": "Migros-Genossenschafts-Bund",
|
|
"jobTitle": "Title",
|
|
"department": "Dept",
|
|
"office": "HH-xx"
|
|
},
|
|
"source": "Organisation"
|
|
}
|
|
]
|
|
}
|
|
}
|
|
"""
|
|
|
|
person_data = json_obj.get("person", {})
|
|
|
|
# Namen
|
|
names_entry = _first_or_none(person_data.get("names"))
|
|
names_value = (names_entry or {}).get("value", {}) if names_entry else {}
|
|
display_name = names_value.get("displayName")
|
|
|
|
# E-Mail
|
|
mail_entry = _first_or_none(person_data.get("emailAddresses"))
|
|
mail_value = (mail_entry or {}).get("value", {}) if mail_entry else {}
|
|
email = mail_value.get("address")
|
|
|
|
# WorkDetails: Firma, Job, Abteilung, Büro
|
|
work_entry = _first_or_none(person_data.get("workDetails"))
|
|
work_value = (work_entry or {}).get("value", {}) if work_entry else {}
|
|
company_name = work_value.get("companyName")
|
|
job_title = work_value.get("jobTitle")
|
|
department = work_value.get("department")
|
|
office = work_value.get("office")
|
|
|
|
return Person(
|
|
email=email,
|
|
display_name=display_name,
|
|
job_title=job_title,
|
|
department=department,
|
|
)
|
|
|
|
|
|
def parse_direct_emails_from_organization(json_obj: dict[str, Any]) -> List[str]:
|
|
"""
|
|
Erwartete Struktur (aus organization_anonymized):
|
|
|
|
{
|
|
"managers": [...],
|
|
"directs": [
|
|
{
|
|
"smtp": "direct.report@mgb.ch",
|
|
"userPrincipalName": "direct.report@mgb.ch",
|
|
...
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
directs = json_obj.get("directs", [])
|
|
emails: List[str] = []
|
|
for item in directs:
|
|
email = item.get("smtp") or item.get("userPrincipalName")
|
|
if email:
|
|
emails.append(email)
|
|
return emails
|
|
|
|
|
|
def parse_workingwith_entries(json_obj: dict[str, Any]) -> List[Person]:
|
|
"""
|
|
Erwartete Struktur (aus working_with_anonymized):
|
|
|
|
{
|
|
"value": [
|
|
{
|
|
"email": "user@migros.ch",
|
|
"userPrincipalName": "user@migros.ch",
|
|
"fullName": "Nachname, Vorname-MIGROS",
|
|
"jobTitle": "Leitung",
|
|
"department": "Dept 1",
|
|
...
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
value = json_obj.get("value", [])
|
|
persons: List[Person] = []
|
|
for item in value:
|
|
email = item.get("email") or item.get("userPrincipalName")
|
|
persons.append(
|
|
Person(
|
|
email=email,
|
|
display_name=item.get("fullName"),
|
|
job_title=item.get("jobTitle"),
|
|
department=item.get("department"),
|
|
)
|
|
)
|
|
return persons
|