feat: enrich /lookup with university domain list check

Add a second detection path alongside ASN lookup: a self-maintained
list of university domains (uni_domains.txt) loaded at startup.

- New /lookup params: email= (extracts domain from address), domain= unchanged
- Suffix matching: insti.uni-stuttgart.de matches list entry uni-stuttgart.de
  without false-positives (evil-uni-stuttgart.de does not match)
- New response fields: asn_match, domain_match, matched_domain (omitempty)
- nren remains true if either asn_match OR domain_match is true (backwards compat)
- /healthz now returns JSON body: {"asn_count":N,"domain_count":N}
- asn-updater: new update_uni_domains() merges hs-kompass.de TSV + Hipo JSON
  (configurable via UNI_DOMAIN_COUNTRIES / HS_KOMPASS_URL env vars)
- 7 new tests; all existing tests pass unchanged

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-17 15:10:49 +01:00
parent 15476898c3
commit 082ecc579a
7 changed files with 435 additions and 60 deletions

View File

@@ -1,4 +1,5 @@
import os, time, json, tarfile, tempfile, shutil
from urllib.parse import urlparse
import requests
OUT_DIR = os.getenv("OUT_DIR", "/data")
@@ -8,6 +9,8 @@ PDB_BASE = os.getenv("PDB_BASE", "https://www.peeringdb.com")
INFO_TYPE = os.getenv("PDB_INFO_TYPE", "Educational/Research")
TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "30"))
LIMIT = int(os.getenv("PDB_LIMIT", "250"))
HS_KOMPASS_URL = os.getenv("HS_KOMPASS_URL", "https://hs-kompass.de/kompass/xml/download/hs_liste.txt")
UNI_DOMAIN_COUNTRIES = os.getenv("UNI_DOMAIN_COUNTRIES", "DE,AT")
def atomic_replace(src_path: str, dst_path: str) -> None:
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
@@ -112,6 +115,77 @@ def update_nren_asns() -> str:
os.chmod(out_txt, 0o644)
return used_info_type
def update_uni_domains() -> int:
country_set = {c.strip() for c in UNI_DOMAIN_COUNTRIES.split(",")}
hs_domains: set = set()
try:
r = requests.get(HS_KOMPASS_URL, timeout=TIMEOUT)
r.raise_for_status()
for line in r.text.splitlines():
parts = line.split("\t")
if len(parts) <= 20:
continue
homepage = parts[20].strip()
if not homepage:
continue
try:
if not homepage.startswith(("http://", "https://")):
homepage = "http://" + homepage
parsed = urlparse(homepage)
hostname = (parsed.hostname or "").lower()
if hostname.startswith("www."):
hostname = hostname[4:]
if hostname:
hs_domains.add(hostname)
except Exception:
continue
except Exception as err:
print(f"[warn] hs-kompass fetch failed: {err}")
hipo_domains: set = set()
try:
r = requests.get(
"https://raw.githubusercontent.com/Hipo/university-domains-list/master/world_universities_and_domains.json",
timeout=TIMEOUT,
)
r.raise_for_status()
for entry in r.json():
if entry.get("alpha_two_code") in country_set:
for d in entry.get("domains", []):
hipo_domains.add(d.lower().strip())
except Exception as err:
print(f"[warn] hipo fetch failed: {err}")
if len(hs_domains) == 0 or len(hipo_domains) == 0:
print(f"[warn] uni_domains: hs_kompass={len(hs_domains)} hipo={len(hipo_domains)}")
merged = hs_domains | hipo_domains
if len(merged) == 0:
print("[warn] uni_domains update produced 0 entries — skipping write to preserve existing file")
return 0
out_txt = os.path.join(OUT_DIR, "uni_domains.txt")
with tempfile.NamedTemporaryFile("w", delete=False, dir=OUT_DIR) as f:
for d in sorted(merged):
f.write(f"{d}\n")
tmp_path = f.name
os.replace(tmp_path, out_txt)
os.chmod(out_txt, 0o644)
meta = {
"hs_kompass": len(hs_domains),
"hipo": len(hipo_domains),
"total": len(merged),
"updated_at_unix": int(time.time()),
}
meta_path = os.path.join(OUT_DIR, "uni_domains_meta.json")
with open(meta_path, "w") as f:
json.dump(meta, f)
os.chmod(meta_path, 0o644)
return len(merged)
def write_meta(info_type: str):
meta = {
"updated_at_unix": int(time.time()),
@@ -126,8 +200,9 @@ def main():
os.makedirs(OUT_DIR, exist_ok=True)
download_maxmind_mmdb()
used_info_type = update_nren_asns()
domain_count = update_uni_domains()
write_meta(used_info_type)
print("[ok] updated mmdb + nren_asns")
print(f"[ok] updated mmdb + nren_asns + uni_domains (domain_count={domain_count})")
if __name__ == "__main__":
main()