Files
enrichment-compare/comparison.py
Michał Flak 51120ec6f2 deslop
2026-02-24 02:36:30 +01:00

343 lines
12 KiB
Python

# /// script
# requires-python = ">=3.12"
# dependencies = [
# "marimo",
# "httpx",
# ]
# ///
import marimo
__generated_with = "0.10.0"
app = marimo.App(width="full")
@app.cell(hide_code=True)
def _():
import marimo as mo
import httpx
import json
import hashlib
import csv
import io
import os
import time
from pathlib import Path
return Path, csv, hashlib, httpx, io, json, mo, os, time
@app.cell(hide_code=True)
def _(Path, hashlib, json):
_dir = Path(__file__).parent / "cache"
_dir.mkdir(exist_ok=True)
def read_cache(provider: str, email: str) -> dict | None:
_h = hashlib.sha256(email.lower().strip().encode()).hexdigest()[:16]
_p = _dir / f"{provider}_{_h}.json"
return json.loads(_p.read_text()) if _p.exists() else None
def write_cache(provider: str, email: str, data: dict):
_h = hashlib.sha256(email.lower().strip().encode()).hexdigest()[:16]
_p = _dir / f"{provider}_{_h}.json"
_p.write_text(json.dumps(data, indent=2, default=str))
return read_cache, write_cache
@app.cell(hide_code=True)
def _(mo, os):
apollo_key = mo.ui.text(label="Apollo", kind="password", value=os.environ.get("APOLLO_API_KEY", ""))
pdl_key = mo.ui.text(label="PeopleDataLabs", kind="password", value=os.environ.get("PDL_API_KEY", ""))
fullenrich_key = mo.ui.text(label="FullEnrich", kind="password", value=os.environ.get("FULLENRICH_API_KEY", ""))
mo.vstack([
mo.md("## API Keys"),
mo.hstack([apollo_key, pdl_key, fullenrich_key], widths="equal"),
])
return apollo_key, fullenrich_key, pdl_key
@app.cell(hide_code=True)
def _(mo):
email_input = mo.ui.text(label="Email (required)", full_width=True)
first_name_input = mo.ui.text(label="First name")
last_name_input = mo.ui.text(label="Last name")
linkedin_input = mo.ui.text(label="LinkedIn URL", full_width=True)
domain_input = mo.ui.text(label="Domain")
batch_input = mo.ui.text_area(
label="One email per line, or CSV: email,first_name,last_name,linkedin_url,domain",
full_width=True,
rows=5,
)
input_tabs = mo.ui.tabs({
"Single person": mo.vstack([
email_input,
mo.hstack([first_name_input, last_name_input, domain_input]),
linkedin_input,
]),
"Batch": batch_input,
})
run_btn = mo.ui.run_button(label="Enrich")
mo.vstack([mo.md("## Person(s) to Enrich"), input_tabs, run_btn])
return (
batch_input, domain_input, email_input,
first_name_input, last_name_input,
linkedin_input, run_btn,
)
@app.cell(hide_code=True)
def _(
mo, run_btn,
email_input, first_name_input, last_name_input, linkedin_input, domain_input,
batch_input, csv, io,
):
mo.stop(not run_btn.value, mo.md("*Click 'Enrich' to start*"))
people = []
if batch_input.value.strip():
for _line in batch_input.value.strip().splitlines():
_line = _line.strip()
if not _line:
continue
if "," in _line:
_reader = csv.reader(io.StringIO(_line))
for _row in _reader:
_p = {"email": _row[0].strip()}
if len(_row) > 1 and _row[1].strip():
_p["first_name"] = _row[1].strip()
if len(_row) > 2 and _row[2].strip():
_p["last_name"] = _row[2].strip()
if len(_row) > 3 and _row[3].strip():
_p["linkedin_url"] = _row[3].strip()
if len(_row) > 4 and _row[4].strip():
_p["domain"] = _row[4].strip()
people.append(_p)
else:
people.append({"email": _line})
elif email_input.value.strip():
_p = {"email": email_input.value.strip()}
if first_name_input.value.strip():
_p["first_name"] = first_name_input.value.strip()
if last_name_input.value.strip():
_p["last_name"] = last_name_input.value.strip()
if linkedin_input.value.strip():
_p["linkedin_url"] = linkedin_input.value.strip()
if domain_input.value.strip():
_p["domain"] = domain_input.value.strip()
people.append(_p)
mo.md(f"**Enriching {len(people)} person(s):** {', '.join(_x['email'] for _x in people)}")
return (people,)
@app.cell(hide_code=True)
def _(mo, people, apollo_key, httpx, read_cache, write_cache):
apollo_results = {}
_msg = "*Apollo: no API key set*"
try:
if apollo_key.value:
for _person in people:
_email = _person["email"]
_cached = read_cache("apollo", _email)
if _cached is not None:
apollo_results[_email] = _cached
continue
_payload = {k: v for k, v in _person.items() if v}
try:
_r = httpx.post(
"https://api.apollo.io/api/v1/people/match",
headers={
"Content-Type": "application/json",
"x-api-key": apollo_key.value,
},
json=_payload,
timeout=30,
)
if _r.status_code == 200:
_data = _r.json()
apollo_results[_email] = _data
write_cache("apollo", _email, _data)
else:
apollo_results[_email] = {"error": _r.status_code, "body": _r.text}
except Exception as e:
apollo_results[_email] = {"error": str(e)}
_msg = f"**Apollo:** {len(apollo_results)} result(s)"
except Exception as e:
_msg = f"**Apollo: error** {e}"
mo.md(_msg)
return (apollo_results,)
@app.cell(hide_code=True)
def _(mo, people, pdl_key, httpx, read_cache, write_cache):
pdl_results = {}
_msg = "*PDL: no API key set*"
try:
if pdl_key.value:
for _person in people:
_email = _person["email"]
_cached = read_cache("pdl", _email)
if _cached is not None:
pdl_results[_email] = _cached
continue
try:
_params = {"email": _email, "min_likelihood": 5}
if _person.get("first_name"):
_params["first_name"] = _person["first_name"]
if _person.get("last_name"):
_params["last_name"] = _person["last_name"]
if _person.get("linkedin_url"):
_params["profile"] = _person["linkedin_url"]
if _person.get("domain"):
_params["website"] = _person["domain"]
_r = httpx.get(
"https://api.peopledatalabs.com/v5/person/enrich",
headers={"X-Api-Key": pdl_key.value},
params=_params,
timeout=30,
)
_data = _r.json()
pdl_results[_email] = _data
if _r.status_code == 200:
write_cache("pdl", _email, _data)
except Exception as e:
pdl_results[_email] = {"error": str(e)}
_msg = f"**PDL:** {len(pdl_results)} result(s)"
except Exception as e:
_msg = f"**PDL: error** {e}"
mo.md(_msg)
return (pdl_results,)
@app.cell(hide_code=True)
def _(mo, people, fullenrich_key, httpx, read_cache, write_cache, time):
fullenrich_results = {}
_msg = "*FullEnrich: no API key set*"
try:
if fullenrich_key.value:
# Check cache first
_uncached = []
for _person in people:
_email = _person["email"]
_cached = read_cache("fullenrich", _email)
if _cached is not None:
fullenrich_results[_email] = _cached
else:
_uncached.append(_person)
# Build batch for uncached people (need name+domain or linkedin_url)
_batch = []
for _person in _uncached:
_entry = {
"enrich_fields": ["contact.emails", "contact.phones", "contact.personal_emails"],
"custom": {"email": _person["email"]},
}
_has_id = False
if _person.get("first_name") and _person.get("last_name") and _person.get("domain"):
_entry["first_name"] = _person["first_name"]
_entry["last_name"] = _person["last_name"]
_entry["domain"] = _person["domain"]
_has_id = True
if _person.get("linkedin_url"):
_entry["linkedin_url"] = _person["linkedin_url"]
_has_id = True
if _has_id:
_batch.append(_entry)
else:
fullenrich_results[_person["email"]] = {
"error": "FullEnrich needs name+domain or linkedin_url"
}
if _batch:
try:
_r = httpx.post(
"https://app.fullenrich.com/api/v2/contact/enrich/bulk",
headers={
"Authorization": f"Bearer {fullenrich_key.value}",
"Content-Type": "application/json",
},
json={"name": "enrichment-comparison", "data": _batch},
timeout=30,
)
if _r.status_code == 200:
_eid = _r.json().get("enrichment_id")
for _attempt in range(24): # poll up to ~120s
time.sleep(5)
_poll = httpx.get(
f"https://app.fullenrich.com/api/v2/contact/enrich/bulk/{_eid}",
headers={"Authorization": f"Bearer {fullenrich_key.value}"},
timeout=30,
)
if _poll.status_code == 200:
_result = _poll.json()
if _result.get("status") == "FINISHED":
for _item in _result.get("data", []):
_em = (_item.get("custom") or {}).get("email")
if _em:
fullenrich_results[_em] = _item
write_cache("fullenrich", _em, _item)
break
elif _poll.status_code != 400: # 400 = still in progress
break
else:
for _entry in _batch:
fullenrich_results[_entry["custom"]["email"]] = {
"error": _r.status_code,
"body": _r.text,
}
except Exception as e:
for _entry in _batch:
fullenrich_results[_entry["custom"]["email"]] = {"error": str(e)}
_msg = f"**FullEnrich:** {len(fullenrich_results)} result(s)"
except Exception as e:
_msg = f"**FullEnrich: error** {e}"
mo.md(_msg)
return (fullenrich_results,)
@app.cell(hide_code=True)
def _(mo, apollo_results, pdl_results, fullenrich_results, json):
def _fmt(d):
return mo.md(f"```json\n{json.dumps(d, indent=2, default=str)}\n```")
mo.vstack([
mo.md("## Results"),
mo.ui.tabs({
"Apollo": _fmt(apollo_results),
"PDL": _fmt(pdl_results),
"FullEnrich": _fmt(fullenrich_results),
}),
])
return
@app.cell(hide_code=True)
def _(mo, apollo_results, pdl_results, fullenrich_results, json):
_raw = json.dumps(
{"apollo": apollo_results, "pdl": pdl_results, "fullenrich": fullenrich_results},
indent=2,
default=str,
).encode()
mo.vstack([
mo.md("## Export"),
mo.download(_raw, filename="enrichment_raw.json", label="Download Raw JSON"),
])
return
if __name__ == "__main__":
app.run()