geoip/src/fetch_geofeeds.py
"""
Try to download all found geofeed URLs.
"""
import json
import sys
from random import shuffle
from concurrent.futures import ThreadPoolExecutor
from hashlib import md5
from http.client import HTTPResponse, HTTPException
from pathlib import Path
from typing import List, Optional
from urllib import request
from urllib.parse import urlparse
from urllib.error import URLError, HTTPError
from .utils import Geofeed
class Fetcher:
def __init__(self, out_dir: Path) -> None:
self._out_dir: Path = out_dir
self._timeout: float = 30.0
def fetch(self, feed: Geofeed) -> Geofeed:
"""Thread worker."""
filename: Path = self._build_filename(feed.geofeed)
try:
with filename.open("xb") as out_fp:
data: Optional[bytes] = self._fetch_url(feed.geofeed)
if data is not None:
out_fp.write(data)
except FileExistsError: # cached or locked
pass
return Geofeed(address_range=feed.address_range, geofeed=filename.as_posix())
def _build_filename(self, url: str) -> Path:
key: str = md5(url.encode("utf-8", errors="strict")).hexdigest() # nosec
return self._out_dir / f"geofeed-{key}.csv"
def _fetch_url(self, url: str) -> Optional[bytes]:
if urlparse(url).scheme not in {"http", "https"}:
print(f"{url} -> Scheme not supported")
return None
try:
data: Optional[bytes] = None
conn: HTTPResponse
with request.urlopen(url, timeout=self._timeout) as conn: # nosec
if 200 <= conn.status < 300:
data = conn.read()
print(f"[{conn.status}] {url}")
return data
except HTTPError as e:
print(f"[{e.code}] {url}")
return None
except URLError as e:
print(f"[000] {url} - {e.reason}")
return None
except (HTTPException, OSError) as e:
print(f"[000] {url} - {str(e)}")
return None
def main(in_files: List[Path], out_file: Path, out_dir: Path) -> int:
feeds: List[Geofeed] = []
for in_file in in_files:
with in_file.open("r") as in_fp:
feeds.extend(Geofeed.from_dict(_) for _ in json.load(in_fp))
shuffle(feeds)
fetcher: Fetcher = Fetcher(out_dir)
with ThreadPoolExecutor(max_workers=4) as executor:
results: List[Geofeed] = list(executor.map(fetcher.fetch, feeds))
with out_file.open("w") as out_fp:
json.dump([_.to_dict() for _ in results],
out_fp,
ensure_ascii=False, indent=True, sort_keys=False)
return 0
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument("--in-files", type=Path, nargs='+', required=True, help="input whois JSON files")
parser.add_argument("--out-file", type=Path, required=True, help="output JSON file")
parser.add_argument("--out-dir", type=Path, required=True, help="output geofeed CSV directory")
args = parser.parse_args()
sys.exit(main(args.in_files, args.out_file, args.out_dir))