# /// script # requires-python = ">=3.12" # dependencies = [ # "marimo", # "httpx", # ] # /// import marimo __generated_with = "0.10.0" app = marimo.App(width="full") @app.cell(hide_code=True) def _(): import marimo as mo import httpx import json import hashlib import csv import io import os import time from pathlib import Path return Path, csv, hashlib, httpx, io, json, mo, os, time @app.cell(hide_code=True) def _(Path, hashlib, json): _dir = Path(__file__).parent / "cache" _dir.mkdir(exist_ok=True) def read_cache(provider: str, email: str) -> dict | None: _h = hashlib.sha256(email.lower().strip().encode()).hexdigest()[:16] _p = _dir / f"{provider}_{_h}.json" return json.loads(_p.read_text()) if _p.exists() else None def write_cache(provider: str, email: str, data: dict): _h = hashlib.sha256(email.lower().strip().encode()).hexdigest()[:16] _p = _dir / f"{provider}_{_h}.json" _p.write_text(json.dumps(data, indent=2, default=str)) return read_cache, write_cache @app.cell(hide_code=True) def _(mo, os): apollo_key = mo.ui.text(label="Apollo", kind="password", value=os.environ.get("APOLLO_API_KEY", "")) pdl_key = mo.ui.text(label="PeopleDataLabs", kind="password", value=os.environ.get("PDL_API_KEY", "")) fullenrich_key = mo.ui.text(label="FullEnrich", kind="password", value=os.environ.get("FULLENRICH_API_KEY", "")) mo.vstack([ mo.md("## API Keys"), mo.hstack([apollo_key, pdl_key, fullenrich_key], widths="equal"), ]) return apollo_key, fullenrich_key, pdl_key @app.cell(hide_code=True) def _(mo): email_input = mo.ui.text(label="Email (required)", full_width=True) first_name_input = mo.ui.text(label="First name") last_name_input = mo.ui.text(label="Last name") linkedin_input = mo.ui.text(label="LinkedIn URL", full_width=True) domain_input = mo.ui.text(label="Domain") batch_input = mo.ui.text_area( label="One email per line, or CSV: email,first_name,last_name,linkedin_url,domain", full_width=True, rows=5, ) input_tabs = mo.ui.tabs({ "Single person": mo.vstack([ email_input, mo.hstack([first_name_input, last_name_input, domain_input]), linkedin_input, ]), "Batch": batch_input, }) run_btn = mo.ui.run_button(label="Enrich") mo.vstack([mo.md("## Person(s) to Enrich"), input_tabs, run_btn]) return ( batch_input, domain_input, email_input, first_name_input, last_name_input, linkedin_input, run_btn, ) @app.cell(hide_code=True) def _( mo, run_btn, email_input, first_name_input, last_name_input, linkedin_input, domain_input, batch_input, csv, io, ): mo.stop(not run_btn.value, mo.md("*Click 'Enrich' to start*")) people = [] if batch_input.value.strip(): for _line in batch_input.value.strip().splitlines(): _line = _line.strip() if not _line: continue if "," in _line: _reader = csv.reader(io.StringIO(_line)) for _row in _reader: _p = {"email": _row[0].strip()} if len(_row) > 1 and _row[1].strip(): _p["first_name"] = _row[1].strip() if len(_row) > 2 and _row[2].strip(): _p["last_name"] = _row[2].strip() if len(_row) > 3 and _row[3].strip(): _p["linkedin_url"] = _row[3].strip() if len(_row) > 4 and _row[4].strip(): _p["domain"] = _row[4].strip() people.append(_p) else: people.append({"email": _line}) elif email_input.value.strip(): _p = {"email": email_input.value.strip()} if first_name_input.value.strip(): _p["first_name"] = first_name_input.value.strip() if last_name_input.value.strip(): _p["last_name"] = last_name_input.value.strip() if linkedin_input.value.strip(): _p["linkedin_url"] = linkedin_input.value.strip() if domain_input.value.strip(): _p["domain"] = domain_input.value.strip() people.append(_p) mo.md(f"**Enriching {len(people)} person(s):** {', '.join(_x['email'] for _x in people)}") return (people,) @app.cell(hide_code=True) def _(mo, people, apollo_key, httpx, read_cache, write_cache): apollo_results = {} _msg = "*Apollo: no API key set*" try: if apollo_key.value: for _person in people: _email = _person["email"] _cached = read_cache("apollo", _email) if _cached is not None: apollo_results[_email] = _cached continue _payload = {k: v for k, v in _person.items() if v} try: _r = httpx.post( "https://api.apollo.io/api/v1/people/match", headers={ "Content-Type": "application/json", "x-api-key": apollo_key.value, }, json=_payload, timeout=30, ) if _r.status_code == 200: _data = _r.json() apollo_results[_email] = _data write_cache("apollo", _email, _data) else: apollo_results[_email] = {"error": _r.status_code, "body": _r.text} except Exception as e: apollo_results[_email] = {"error": str(e)} _msg = f"**Apollo:** {len(apollo_results)} result(s)" except Exception as e: _msg = f"**Apollo: error** {e}" mo.md(_msg) return (apollo_results,) @app.cell(hide_code=True) def _(mo, people, pdl_key, httpx, read_cache, write_cache): pdl_results = {} _msg = "*PDL: no API key set*" try: if pdl_key.value: for _person in people: _email = _person["email"] _cached = read_cache("pdl", _email) if _cached is not None: pdl_results[_email] = _cached continue try: _params = {"email": _email, "min_likelihood": 5} if _person.get("first_name"): _params["first_name"] = _person["first_name"] if _person.get("last_name"): _params["last_name"] = _person["last_name"] if _person.get("linkedin_url"): _params["profile"] = _person["linkedin_url"] if _person.get("domain"): _params["website"] = _person["domain"] _r = httpx.get( "https://api.peopledatalabs.com/v5/person/enrich", headers={"X-Api-Key": pdl_key.value}, params=_params, timeout=30, ) _data = _r.json() pdl_results[_email] = _data if _r.status_code == 200: write_cache("pdl", _email, _data) except Exception as e: pdl_results[_email] = {"error": str(e)} _msg = f"**PDL:** {len(pdl_results)} result(s)" except Exception as e: _msg = f"**PDL: error** {e}" mo.md(_msg) return (pdl_results,) @app.cell(hide_code=True) def _(mo, people, fullenrich_key, httpx, read_cache, write_cache, time): fullenrich_results = {} _msg = "*FullEnrich: no API key set*" try: if fullenrich_key.value: # Check cache first _uncached = [] for _person in people: _email = _person["email"] _cached = read_cache("fullenrich", _email) if _cached is not None: fullenrich_results[_email] = _cached else: _uncached.append(_person) # Build batch for uncached people (need name+domain or linkedin_url) _batch = [] for _person in _uncached: _entry = { "enrich_fields": ["contact.emails", "contact.phones", "contact.personal_emails"], "custom": {"email": _person["email"]}, } _has_id = False if _person.get("first_name") and _person.get("last_name") and _person.get("domain"): _entry["first_name"] = _person["first_name"] _entry["last_name"] = _person["last_name"] _entry["domain"] = _person["domain"] _has_id = True if _person.get("linkedin_url"): _entry["linkedin_url"] = _person["linkedin_url"] _has_id = True if _has_id: _batch.append(_entry) else: fullenrich_results[_person["email"]] = { "error": "FullEnrich needs name+domain or linkedin_url" } if _batch: try: _r = httpx.post( "https://app.fullenrich.com/api/v2/contact/enrich/bulk", headers={ "Authorization": f"Bearer {fullenrich_key.value}", "Content-Type": "application/json", }, json={"name": "enrichment-comparison", "data": _batch}, timeout=30, ) if _r.status_code == 200: _eid = _r.json().get("enrichment_id") for _attempt in range(24): # poll up to ~120s time.sleep(5) _poll = httpx.get( f"https://app.fullenrich.com/api/v2/contact/enrich/bulk/{_eid}", headers={"Authorization": f"Bearer {fullenrich_key.value}"}, timeout=30, ) if _poll.status_code == 200: _result = _poll.json() if _result.get("status") == "FINISHED": for _item in _result.get("data", []): _em = (_item.get("custom") or {}).get("email") if _em: fullenrich_results[_em] = _item write_cache("fullenrich", _em, _item) break elif _poll.status_code != 400: # 400 = still in progress break else: for _entry in _batch: fullenrich_results[_entry["custom"]["email"]] = { "error": _r.status_code, "body": _r.text, } except Exception as e: for _entry in _batch: fullenrich_results[_entry["custom"]["email"]] = {"error": str(e)} _msg = f"**FullEnrich:** {len(fullenrich_results)} result(s)" except Exception as e: _msg = f"**FullEnrich: error** {e}" mo.md(_msg) return (fullenrich_results,) @app.cell(hide_code=True) def _(mo, apollo_results, pdl_results, fullenrich_results, json): def _fmt(d): return mo.md(f"```json\n{json.dumps(d, indent=2, default=str)}\n```") mo.vstack([ mo.md("## Results"), mo.ui.tabs({ "Apollo": _fmt(apollo_results), "PDL": _fmt(pdl_results), "FullEnrich": _fmt(fullenrich_results), }), ]) return @app.cell(hide_code=True) def _(mo, apollo_results, pdl_results, fullenrich_results, json): _raw = json.dumps( {"apollo": apollo_results, "pdl": pdl_results, "fullenrich": fullenrich_results}, indent=2, default=str, ).encode() mo.vstack([ mo.md("## Export"), mo.download(_raw, filename="enrichment_raw.json", label="Download Raw JSON"), ]) return if __name__ == "__main__": app.run()