geoip/src/parse_geofeeds.py
"""
Parse downloaded geofeed CSV files into ranges and countries.
"""
import csv
import json
import re
import sys
from ipaddress import IPv4Address
from pathlib import Path
from typing import List, Iterator
from .utils import Geofeed, Entry, IPv4Range
def _parse_csv(geofeed: Path) -> Iterator[Entry]:
with geofeed.open("r", errors="replace") as in_fp:
for row in csv.reader((line for line in in_fp if re.match(r"^[0-9.]+(/[0-9]+)?,[A-Z]+,", line))):
try:
yield Entry(address_range=IPv4Range.from_mask_str(row[0], strict=False),
country=row[1],
source="GEOFEED")
except ValueError:
pass
def _parse(feed: Geofeed) -> Iterator[Entry]:
for entry in _parse_csv(Path(feed.geofeed)):
start: IPv4Address = max(entry.address_range.start, feed.address_range.start)
end: IPv4Address = min(entry.address_range.end, feed.address_range.end)
if start <= end: # intersection
yield Entry(address_range=IPv4Range(start, end), country=entry.country, source=entry.source)
def main(in_file: Path, out_file: Path) -> int:
with in_file.open("r") as in_fp:
feeds: List[Geofeed] = [Geofeed.from_dict(_) for _ in json.load(in_fp)]
with out_file.open("w") as out_fp:
json.dump([entry.to_dict()
for feed in feeds
for entry in sorted(set(_parse(feed)), key=lambda _: _.address_range)],
out_fp,
ensure_ascii=False, indent=True, sort_keys=False)
return 0
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--in-file", type=Path, required=True, help="input JSON file")
parser.add_argument("--out-file", type=Path, required=True, help="output JSON file")
args = parser.parse_args()
sys.exit(main(args.in_file, args.out_file))