# /// script # requires-python = ">=3.12" # dependencies = [ # "marimo", # "httpx", # "polars", # ] # /// import marimo __generated_with = "0.10.0" app = marimo.App(width="full") @app.cell(hide_code=True) def _(): import marimo as mo import httpx import json import hashlib import csv import io import os import time from pathlib import Path import polars as pl return Path, csv, hashlib, httpx, io, json, mo, os, pl, time @app.cell(hide_code=True) def _(Path, hashlib, json): _dir = Path(__file__).parent / "cache" _dir.mkdir(exist_ok=True) def read_cache(provider: str, email: str) -> dict | None: _h = hashlib.sha256(email.lower().strip().encode()).hexdigest()[:16] _p = _dir / f"{provider}_{_h}.json" return json.loads(_p.read_text()) if _p.exists() else None def write_cache(provider: str, email: str, data: dict): _h = hashlib.sha256(email.lower().strip().encode()).hexdigest()[:16] _p = _dir / f"{provider}_{_h}.json" _p.write_text(json.dumps(data, indent=2, default=str)) return read_cache, write_cache @app.cell(hide_code=True) def _(mo, os): apollo_key = mo.ui.text(label="Apollo", kind="password", value=os.environ.get("APOLLO_API_KEY", "")) pdl_key = mo.ui.text(label="PeopleDataLabs", kind="password", value=os.environ.get("PDL_API_KEY", "")) fullenrich_key = mo.ui.text(label="FullEnrich", kind="password", value=os.environ.get("FULLENRICH_API_KEY", "")) mo.vstack([ mo.md("## API Keys"), mo.hstack([apollo_key, pdl_key, fullenrich_key], widths="equal"), ]) return apollo_key, fullenrich_key, pdl_key @app.cell(hide_code=True) def _(mo): email_input = mo.ui.text(label="Email (required)", full_width=True) first_name_input = mo.ui.text(label="First name") last_name_input = mo.ui.text(label="Last name") linkedin_input = mo.ui.text(label="LinkedIn URL", full_width=True) domain_input = mo.ui.text(label="Domain") batch_input = mo.ui.text_area( label="One email per line, or CSV: email,first_name,last_name,linkedin_url,domain", full_width=True, rows=5, ) input_tabs = mo.ui.tabs({ "Single person": mo.vstack([ email_input, mo.hstack([first_name_input, last_name_input, domain_input]), linkedin_input, ]), "Batch": batch_input, }) run_btn = mo.ui.run_button(label="Enrich") mo.vstack([mo.md("## Person(s) to Enrich"), input_tabs, run_btn]) return ( batch_input, domain_input, email_input, first_name_input, last_name_input, linkedin_input, run_btn, ) @app.cell(hide_code=True) def _( mo, run_btn, email_input, first_name_input, last_name_input, linkedin_input, domain_input, batch_input, csv, io, ): mo.stop(not run_btn.value, mo.md("*Click 'Enrich' to start*")) people = [] if batch_input.value.strip(): for _line in batch_input.value.strip().splitlines(): _line = _line.strip() if not _line: continue if "," in _line: _reader = csv.reader(io.StringIO(_line)) for _row in _reader: _p = {"email": _row[0].strip()} if len(_row) > 1 and _row[1].strip(): _p["first_name"] = _row[1].strip() if len(_row) > 2 and _row[2].strip(): _p["last_name"] = _row[2].strip() if len(_row) > 3 and _row[3].strip(): _p["linkedin_url"] = _row[3].strip() if len(_row) > 4 and _row[4].strip(): _p["domain"] = _row[4].strip() people.append(_p) else: people.append({"email": _line}) elif email_input.value.strip(): _p = {"email": email_input.value.strip()} if first_name_input.value.strip(): _p["first_name"] = first_name_input.value.strip() if last_name_input.value.strip(): _p["last_name"] = last_name_input.value.strip() if linkedin_input.value.strip(): _p["linkedin_url"] = linkedin_input.value.strip() if domain_input.value.strip(): _p["domain"] = domain_input.value.strip() people.append(_p) mo.md(f"**Enriching {len(people)} person(s):** {', '.join(_x['email'] for _x in people)}") return (people,) @app.cell(hide_code=True) def _(): def extract_apollo(data): p = data.get("person") or {} org = p.get("organization") or {} loc = [p.get("city"), p.get("state"), p.get("country")] return { "name": p.get("name") or "", "title": p.get("title") or "", "company": org.get("name") or "", "industry": org.get("industry") or "", "location": ", ".join(x for x in loc if x), "linkedin": p.get("linkedin_url") or "", "phones": ", ".join(p.get("phone_numbers") or []), "found_emails": p.get("email") or "", } def extract_pdl(data): d = data.get("data") or data phones = d.get("mobile_phone") or "" if not phones and d.get("phone_numbers"): phones = ", ".join(d["phone_numbers"][:3]) emails_parts = [] if d.get("work_email"): emails_parts.append(d["work_email"]) if d.get("personal_emails"): emails_parts.extend(d["personal_emails"][:2]) return { "name": d.get("full_name") or "", "title": d.get("job_title") or "", "company": d.get("job_company_name") or "", "industry": d.get("job_company_industry") or "", "location": d.get("location_name") or "", "linkedin": d.get("linkedin_url") or "", "phones": phones, "found_emails": ", ".join(emails_parts), } def extract_fullenrich(data): ci = data.get("contact_info") or {} prof = data.get("profile") or {} inp = data.get("input") or {} emails_parts = [] if ci.get("most_probable_work_email"): emails_parts.append(ci["most_probable_work_email"]) if ci.get("work_emails"): for e in ci["work_emails"]: if e not in emails_parts: emails_parts.append(e) if ci.get("personal_email"): emails_parts.append(ci["personal_email"]) return { "name": prof.get("full_name") or "", "title": prof.get("headline") or "", "company": inp.get("company_name") or "", "industry": "", "location": prof.get("location") or "", "linkedin": inp.get("linkedin_url") or "", "phones": ", ".join(ci.get("phones") or []), "found_emails": ", ".join(emails_parts), } return extract_apollo, extract_fullenrich, extract_pdl @app.cell(hide_code=True) def _(mo, people, apollo_key, httpx, read_cache, write_cache): apollo_results = {} _msg = "*Apollo: no API key set*" try: if apollo_key.value: for _person in people: _email = _person["email"] _cached = read_cache("apollo", _email) if _cached is not None: apollo_results[_email] = _cached continue _payload = {k: v for k, v in _person.items() if v} try: _r = httpx.post( "https://api.apollo.io/api/v1/people/match", headers={ "Content-Type": "application/json", "x-api-key": apollo_key.value, }, json=_payload, timeout=30, ) if _r.status_code == 200: _data = _r.json() apollo_results[_email] = _data write_cache("apollo", _email, _data) else: apollo_results[_email] = {"error": _r.status_code, "body": _r.text} except Exception as e: apollo_results[_email] = {"error": str(e)} _msg = f"**Apollo:** {len(apollo_results)} result(s)" except Exception as e: _msg = f"**Apollo: error** {e}" mo.md(_msg) return (apollo_results,) @app.cell(hide_code=True) def _(mo, people, pdl_key, httpx, read_cache, write_cache): pdl_results = {} _msg = "*PDL: no API key set*" try: if pdl_key.value: for _person in people: _email = _person["email"] _cached = read_cache("pdl", _email) if _cached is not None: pdl_results[_email] = _cached continue try: _r = httpx.get( "https://api.peopledatalabs.com/v5/person/enrich", headers={"X-Api-Key": pdl_key.value}, params={"email": _email, "min_likelihood": 5}, timeout=30, ) _data = _r.json() pdl_results[_email] = _data if _r.status_code == 200: write_cache("pdl", _email, _data) except Exception as e: pdl_results[_email] = {"error": str(e)} _msg = f"**PDL:** {len(pdl_results)} result(s)" except Exception as e: _msg = f"**PDL: error** {e}" mo.md(_msg) return (pdl_results,) @app.cell(hide_code=True) def _(mo, people, fullenrich_key, httpx, read_cache, write_cache, time): fullenrich_results = {} _msg = "*FullEnrich: no API key set*" try: if fullenrich_key.value: # Check cache first _uncached = [] for _person in people: _email = _person["email"] _cached = read_cache("fullenrich", _email) if _cached is not None: fullenrich_results[_email] = _cached else: _uncached.append(_person) # Build batch for uncached people (need name+domain or linkedin_url) _batch = [] for _person in _uncached: _entry = { "enrich_fields": ["contact.emails", "contact.phones", "contact.personal_emails"], "custom": {"email": _person["email"]}, } _has_id = False if _person.get("first_name") and _person.get("last_name"): _entry["first_name"] = _person["first_name"] _entry["last_name"] = _person["last_name"] _entry["domain"] = _person.get("domain") or _person["email"].split("@")[1] _has_id = True if _person.get("linkedin_url"): _entry["linkedin_url"] = _person["linkedin_url"] _has_id = True if _has_id: _batch.append(_entry) else: fullenrich_results[_person["email"]] = { "error": "FullEnrich needs name+domain or linkedin_url" } if _batch: try: _r = httpx.post( "https://app.fullenrich.com/api/v2/contact/enrich/bulk", headers={ "Authorization": f"Bearer {fullenrich_key.value}", "Content-Type": "application/json", }, json={"name": "enrichment-comparison", "data": _batch}, timeout=30, ) if _r.status_code == 200: _eid = _r.json().get("enrichment_id") for _attempt in range(24): # poll up to ~120s time.sleep(5) _poll = httpx.get( f"https://app.fullenrich.com/api/v2/contact/enrich/bulk/{_eid}", headers={"Authorization": f"Bearer {fullenrich_key.value}"}, timeout=30, ) if _poll.status_code == 200: _result = _poll.json() if _result.get("status") == "FINISHED": for _item in _result.get("data", []): _em = (_item.get("custom") or {}).get("email") if _em: fullenrich_results[_em] = _item write_cache("fullenrich", _em, _item) break elif _poll.status_code != 400: # 400 = still in progress break else: for _entry in _batch: fullenrich_results[_entry["custom"]["email"]] = { "error": _r.status_code, "body": _r.text, } except Exception as e: for _entry in _batch: fullenrich_results[_entry["custom"]["email"]] = {"error": str(e)} _msg = f"**FullEnrich:** {len(fullenrich_results)} result(s)" except Exception as e: _msg = f"**FullEnrich: error** {e}" mo.md(_msg) return (fullenrich_results,) @app.cell(hide_code=True) def _( mo, people, apollo_results, pdl_results, fullenrich_results, extract_apollo, extract_pdl, extract_fullenrich, pl, ): _rows = [] for _person in people: _email = _person["email"] for _provider, _results, _extractor in [ ("Apollo", apollo_results, extract_apollo), ("PDL", pdl_results, extract_pdl), ("FullEnrich", fullenrich_results, extract_fullenrich), ]: if _email in _results and "error" not in _results[_email]: _extracted = _extractor(_results[_email]) _rows.append({"email": _email, "provider": _provider, **_extracted}) comparison_df = pl.DataFrame(_rows) if _rows else None if comparison_df is not None: mo.vstack([mo.md("## Comparison"), mo.ui.table(comparison_df)]) else: mo.md("## Comparison\n\n*No results yet*") return (comparison_df,) @app.cell(hide_code=True) def _(mo, apollo_results, pdl_results, fullenrich_results, json): def _fmt(d): return mo.md(f"```json\n{json.dumps(d, indent=2, default=str)}\n```") mo.vstack([ mo.md("## Raw Results"), mo.ui.tabs({ "Apollo": _fmt(apollo_results), "PDL": _fmt(pdl_results), "FullEnrich": _fmt(fullenrich_results), }), ]) return @app.cell(hide_code=True) def _(mo, comparison_df, apollo_results, pdl_results, fullenrich_results, json): _items = [] if comparison_df is not None: _csv_bytes = comparison_df.write_csv().encode() _items.append( mo.download(_csv_bytes, filename="enrichment_comparison.csv", label="Download CSV") ) _raw = json.dumps( {"apollo": apollo_results, "pdl": pdl_results, "fullenrich": fullenrich_results}, indent=2, default=str, ).encode() _items.append( mo.download(_raw, filename="enrichment_raw.json", label="Download Raw JSON") ) mo.vstack([mo.md("## Export"), mo.hstack(_items)]) return if __name__ == "__main__": app.run()