geoip/src/parse_transfers.py
"""
Preprocess and strip down registry transfer logs into a common JSON format.
"""
import json
import re
import sys
from datetime import datetime
from itertools import chain
from pathlib import Path
from typing import Iterator, List, Optional
from .utils import IPv4Range, REGISTRIES, Transfer
def _sanitize_address(address: str) -> str:
"""IPv4Address: Leading zeros are not permitted"""
return re.sub(r"^0*([0-9]+)\.0*([0-9]+)\.0*([0-9]+)\.0*([0-9]+)$", r"\1.\2.\3.\4", address)
def _parse_datetime(transfer_date: str) -> int:
if transfer_date.endswith("Z"):
transfer_date = transfer_date[:-1] + "UTC"
for fmt in ["%Y-%m-%dT%H:%M:%S%Z", "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d %H:%M:%S.000%z"]:
try:
return round(datetime.strptime(transfer_date, fmt).timestamp() * 1000)
except ValueError:
pass
raise ValueError(f"Cannot parse timestamp '{transfer_date}'")
def _deduplicate_transfers(transfers: Iterator[Transfer]) -> Iterator[Transfer]:
last: Optional[Transfer] = None
for transfer in sorted(transfers, key=lambda _: (_.address_range, _.timestamp)): # stable ascending, as inputs
if last is None or transfer.address_range != last.address_range or \
transfer.source != last.source or transfer.recipient != last.recipient:
yield transfer # not an obvious duplicate, letting for example back-transfers through
last = transfer
def _parse_transfers(in_file: Path) -> Iterator[Transfer]:
with in_file.open("r") as in_fp:
for entry in json.load(in_fp)["transfers"]:
if entry["type"] != "RESOURCE_TRANSFER" or "ip4nets" not in entry:
continue
transfer_date: int = _parse_datetime(entry["transfer_date"])
source_rir = REGISTRIES[entry["source_rir"]]
recipient_rir = REGISTRIES[entry["recipient_rir"]]
if source_rir == recipient_rir: # remove noops for our use-case early
continue
transfer_sets = entry["ip4nets"] if isinstance(entry["ip4nets"], list) else [entry["ip4nets"]]
for transfer_set in transfer_sets:
for transfer in transfer_set["transfer_set"]:
yield Transfer(address_range=IPv4Range.from_str(_sanitize_address(transfer["start_address"]),
_sanitize_address(transfer["end_address"])),
source=source_rir,
recipient=recipient_rir,
timestamp=transfer_date)
def main(in_files: List[Path], out_file: Path) -> int:
with out_file.open("w") as out_fp:
json.dump(
list(_.to_dict() for _ in _deduplicate_transfers(chain(*[
_parse_transfers(in_file) for in_file in in_files
]))),
out_fp,
ensure_ascii=False, indent=True, sort_keys=False)
return 0
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--in-files", type=Path, nargs='+', required=True, help="input registry transfer JSON files")
parser.add_argument("--out-file", type=Path, required=True, help="output JSON file")
args = parser.parse_args()
sys.exit(main(args.in_files, args.out_file))