geoip/src/parse_whois.py
"""
Parse whois dumps for geofeed URLs into JSON format.
"""
import gzip
import json
import re
import sys
from pathlib import Path
from urllib.parse import urlparse, urlunparse
from typing import Iterator, IO, List, Tuple
from .utils import IPv4Range, Geofeed
_geofeed_dcl_re: re.Pattern = re.compile(
r"^(?:[^a-zA-Z]*|[a-z]+: +)[Gg]eo(?:[Ff]eeds|[Ll]ocation)?(?:\b.*)? (https?://.*)$",
re.MULTILINE)
_geofeed_url_re: re.Pattern = re.compile(
r"\b(https?://\S*\b[a-z]*geofeeds?\b\S*)$",
re.MULTILINE)
def _parse_whois_entry(lines: List[str]) -> List[Tuple[str, str]]:
"""Whois block into key-value multidict."""
tokenized: List[List[str]] = [line.split(":", maxsplit=1) for line in lines]
return [(_[0], _[1].lstrip()) for _ in tokenized if len(_) == 2]
def _parse_route(r: str) -> IPv4Range:
"""Create range from range or netmask notation."""
def _sanitize_netmask(address: str) -> str:
"""Expected 4 octets in '24.152.0/22'"""
return address.replace("/", (".0" * (3 - address.count("."))) + "/")
return IPv4Range.from_range_str(r) if "-" in r else IPv4Range.from_mask_str(_sanitize_netmask(r))
def _parse_for_geofeed(entry: List[Tuple[str, str]]) -> Iterator[Geofeed]:
"""Whois key-value pairs into datastructure, if range and feed exist by heuristics."""
ranges: List[str] = []
feeds: List[str] = []
for k, v in entry:
if k in ["route", "inetnum"]:
ranges.append(v)
elif k in ["remarks", "descr"]:
feeds.extend(_.rstrip("\" ") for _ in _geofeed_dcl_re.findall(v))
if k == "remarks":
feeds.extend(_geofeed_url_re.findall(v))
elif k == "geofeed":
feeds.append(v)
for feed in set(urlunparse(urlparse(_)) for _ in feeds):
for address_range in set(_parse_route(_) for _ in ranges):
yield Geofeed(address_range=address_range, geofeed=feed)
def _parse_whois_stream(in_file: IO[str]) -> Iterator[Geofeed]:
"""Empty-line separated whois blocks into geofeed datastructures, if available."""
entry: List[str] = []
for line in in_file:
line = line.rstrip()
if not len(line) or line.startswith("#"): # start/end of block
yield from _parse_for_geofeed(_parse_whois_entry(entry))
entry = []
elif line.startswith(" ") and len(entry): # continuation
entry[-1] += "\n" + line.lstrip()
else:
entry.append(line)
yield from _parse_for_geofeed(_parse_whois_entry(entry))
def main(in_file: Path, out_file: Path) -> int:
with gzip.open(in_file, "rt", encoding="utf-8", errors="replace", newline="\n") as in_fp:
with out_file.open("w") as out_fp:
json.dump([_.to_dict() for _ in sorted(set(_parse_whois_stream(in_fp)), # sorted, but not merged
key=lambda _: (_.address_range, _.geofeed))],
out_fp,
ensure_ascii=False, indent=True, sort_keys=False)
return 0
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--in-file", type=Path, required=True, help="input whois gzip file to parse")
parser.add_argument("--out-file", type=Path, required=True, help="output JSON file")
args = parser.parse_args()
sys.exit(main(args.in_file, args.out_file))