geoip/src/dump_csv.py
"""
Write the results from previous steps, i.e. per-country networks and GeoName ids, as MaxMind-compatible CSV database.
"""
import sys
import json
import csv
from .utils import REGISTRY_CONTINENTS
from ipaddress import IPv4Address
from pathlib import Path
from typing import List, Dict, Tuple
def _location_lookup(locations: Dict[Tuple[str, str], Dict[str, str]],
country: str) -> Tuple[str, Dict[str, str], Dict[str, str]]:
"""Possibly translated country code, actual location, fallback location (e.g., continent)."""
try:
return country, locations[("", country)], locations[("", country)]
except KeyError:
country = REGISTRY_CONTINENTS.get(country, "") or "ZZ"
try:
return country, {}, locations[(country, "")]
except KeyError:
return country, {}, {}
def main(out_format: str, location_file: Path, in_file: Path, out_file: Path) -> int:
locations: Dict[Tuple[str, str], Dict[str, str]] = {("ZZ", ""): {"code": "ZZ", "name": "Reserved"}}
with location_file.open("r") as in_fp:
reader: csv.DictReader = csv.DictReader(in_fp)
for row in reader:
row["code"] = row["country_iso_code"] or row["continent_code"]
row["name"] = row["country_name"] or row["continent_name"]
locations[(row["continent_code"] if not row["country_iso_code"] else "", row["country_iso_code"])] = row
with in_file.open("r") as in_fp:
in_data: List[Dict[str, str]] = json.load(in_fp)
with out_file.open("w") as out_fp:
if out_format == "geoip2":
writer: csv.DictWriter = csv.DictWriter(out_fp,
fieldnames=["network", "geoname_id",
"registered_country_geoname_id",
"represented_country_geoname_id",
"is_anonymous_proxy", "is_satellite_provider"],
quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
writer.writeheader()
for row in in_data:
country, location, fallback_location = _location_lookup(locations, row["country"])
writer.writerow({
"network": row["network"],
"geoname_id": fallback_location.get("geoname_id", ""),
"registered_country_geoname_id": location.get("geoname_id", ""),
"is_anonymous_proxy": "0",
"is_satellite_provider": "0",
})
elif out_format == "legacy":
writer = csv.DictWriter(out_fp,
fieldnames=["start", "end", "start_int", "end_int", "country", "country_name"],
quoting=csv.QUOTE_ALL, lineterminator='\n')
for row in in_data:
country, location, fallback_location = _location_lookup(locations, row["country"])
if location:
# Cannot use continent, region, or reserved codes as all are hardcoded in the library. The name set
# here will get ignored, too.
writer.writerow({
"start": row["start"],
"end": row["end"],
"start_int": int(IPv4Address(row["start"])),
"end_int": int(IPv4Address(row["end"])),
"country": country,
"country_name": location.get("country_name", ""),
})
elif out_format == "net":
writer = csv.DictWriter(out_fp,
fieldnames=["network", "country", "country_code", "country_name"],
quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
writer.writeheader()
for row in in_data:
country, location, fallback_location = _location_lookup(locations, row["country"])
writer.writerow({
"network": row["network"],
"country": fallback_location.get("code", ""),
"country_code": location.get("code", ""),
"country_name": fallback_location.get("name", ""),
})
elif out_format == "range":
writer = csv.DictWriter(out_fp,
fieldnames=["start", "end", "country", "country_code", "country_name"],
quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
writer.writeheader()
for row in in_data:
country, location, fallback_location = _location_lookup(locations, row["country"])
writer.writerow({
"start": row["start"],
"end": row["end"],
"country": fallback_location.get("code", ""),
"country_code": location.get("code", ""),
"country_name": fallback_location.get("name", ""),
})
return 0
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--format", choices=["range", "net", "legacy", "geoip2"], required=True, help="output format")
parser.add_argument("--location-file", type=Path, required=True, help="location file as GeoName CSV")
parser.add_argument("--in-file", type=Path, required=True, help="merged country/network JSON")
parser.add_argument("--out-file", type=Path, required=True, help="result CSV")
args = parser.parse_args()
sys.exit(main(args.format, args.location_file, args.in_file, args.out_file))