import os, time, json, tarfile, tempfile, shutil import requests OUT_DIR = os.getenv("OUT_DIR", "/data") LICENSE_KEY = os.getenv("MAXMIND_LICENSE_KEY", "").strip() PDB_API_KEY = os.getenv("PDB_API_KEY", "").strip() PDB_BASE = os.getenv("PDB_BASE", "https://www.peeringdb.com") INFO_TYPE = os.getenv("PDB_INFO_TYPE", "Educational/Research") TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "30")) LIMIT = int(os.getenv("PDB_LIMIT", "250")) def atomic_replace(src_path: str, dst_path: str) -> None: os.makedirs(os.path.dirname(dst_path), exist_ok=True) tmp = dst_path + ".tmp" shutil.copyfile(src_path, tmp) os.replace(tmp, dst_path) def download_maxmind_mmdb() -> None: if not LICENSE_KEY: raise RuntimeError("MAXMIND_LICENSE_KEY missing") # Offizieller GeoLite2 Download-Mechanismus per license_key + edition_id url = ( "https://download.maxmind.com/app/geoip_download" f"?edition_id=GeoLite2-ASN&license_key={LICENSE_KEY}&suffix=tar.gz" ) with tempfile.TemporaryDirectory() as td: tgz = os.path.join(td, "GeoLite2-ASN.tar.gz") r = requests.get(url, timeout=TIMEOUT) r.raise_for_status() with open(tgz, "wb") as f: f.write(r.content) mmdb_found = None with tarfile.open(tgz, "r:gz") as tar: for member in tar.getmembers(): if member.name.endswith("GeoLite2-ASN.mmdb"): tar.extract(member, path=td) mmdb_found = os.path.join(td, member.name) break if not mmdb_found or not os.path.exists(mmdb_found): raise RuntimeError("GeoLite2-ASN.mmdb not found in archive") atomic_replace(mmdb_found, os.path.join(OUT_DIR, "GeoLite2-ASN.mmdb")) def pdb_headers(): if not PDB_API_KEY: return {"Accept": "application/json"} # PeeringDB API Key (optional) return {"Accept": "application/json", "Authorization": f"Api-Key {PDB_API_KEY}"} def fetch_pdb_page(skip: int): url = f"{PDB_BASE}/api/net" params = { "info_type": INFO_TYPE, "limit": LIMIT, "skip": skip, "fields": "asn,status,info_type", } r = requests.get(url, params=params, headers=pdb_headers(), timeout=TIMEOUT) r.raise_for_status() j = r.json() return j.get("data", []) def update_nren_asns() -> None: asns = set() skip = 0 while True: data = fetch_pdb_page(skip) for obj in data: if obj.get("status") != "ok": continue asn = obj.get("asn") if isinstance(asn, int) and asn > 0: asns.add(asn) if len(data) < LIMIT: break skip += LIMIT time.sleep(1.1) # sehr konservativ out_txt = os.path.join(OUT_DIR, "nren_asns.txt") with tempfile.NamedTemporaryFile("w", delete=False, dir=OUT_DIR) as f: for a in sorted(asns): f.write(f"{a}\n") tmp_path = f.name os.replace(tmp_path, out_txt) def write_meta(): meta = { "updated_at_unix": int(time.time()), "info_type": INFO_TYPE, "pdb_base": PDB_BASE, } with open(os.path.join(OUT_DIR, "metadata.json"), "w") as f: json.dump(meta, f, indent=2) def main(): os.makedirs(OUT_DIR, exist_ok=True) download_maxmind_mmdb() update_nren_asns() write_meta() print("[ok] updated mmdb + nren_asns") if __name__ == "__main__": main()