geoip/src/parse_delegations.py
"""
Preprocess IP address space delegations of the different registries into a common JSON format.
"""
import re
import sys
import csv
import json
from .utils import IPv4Range, Entry, REGISTRIES
from ipaddress import IPv4Address
from pathlib import Path
from typing import List, Iterator
def _parse_address_space(in_file: Path, source: str) -> Iterator[Entry]:
"""
Main delegations to registries of whole /8 networks.
https://www.iana.org/assignments/ipv4-address-space/ipv4-address-space.xhtml
"""
with in_file.open("r") as in_fp:
reader: csv.DictReader = csv.DictReader(in_fp)
for row in reader:
prefix: IPv4Range = IPv4Range.from_mask_str(re.sub(r"^0{0,2}([0-9]+)/8$", r"\1.0.0.0/8", row["Prefix"]))
whois: str = re.sub(r"^whois\.([a-z]+)\.net$", r"\1", row["WHOIS"]).upper()
status: str = row["Status [1]"].upper()
yield Entry(
country=REGISTRIES[whois or status],
address_range=prefix,
source=REGISTRIES[source.upper()],
)
def _parse_delegated(in_file: Path, source: str) -> Iterator[Entry]:
"""
Per-registry country delegations, as '|'-separated CSV.
"""
with in_file.open("r") as in_fp:
for line in in_fp:
row: List[str] = line.split("|")
if len(row) < 6 or row[0].startswith("#") or row[2] != "ipv4" or row[5] == "summary\n":
continue
address: IPv4Address = IPv4Address(row[3])
length: int = int(row[4])
country: str = row[1].upper() if row[1] != "ZZ" else ""
whois: str = row[0].upper()
yield Entry(
country=country or REGISTRIES[whois],
address_range=IPv4Range(address, address + length - 1),
source=REGISTRIES[source.upper()],
)
def main(in_format: str, source: str, in_file: Path, out_file: Path) -> int:
with out_file.open("w") as out_fp:
json.dump([_.to_dict() for _ in (_parse_address_space(in_file, source) if in_format == "address-space" else
_parse_delegated(in_file, source))],
out_fp,
ensure_ascii=False, indent=True, sort_keys=False)
return 0
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--format", choices=["delegated", "address-space"], required=True, help="input file format")
parser.add_argument("--source", type=str, required=True, help="registry source tag to set in results")
parser.add_argument("--in-file", type=Path, required=True, help="input file to parse")
parser.add_argument("--out-file", type=Path, required=True, help="output JSON file")
args = parser.parse_args()
sys.exit(main(args.format, args.source, args.in_file, args.out_file))