#!/usr/bin/env python3

"""
Watch git repositories and trigger arbitrary pipelines such as docker build and run commands.
Light on runtime resources and configuration overhead.
"""

import argparse
import asyncio
import configparser
import gc
import json
import logging
import os
import re
import signal
import sys
from collections import defaultdict, UserDict
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
from enum import IntEnum
from locale import setlocale, LC_ALL
from logging.handlers import QueueHandler, QueueListener
from pathlib import Path
from queue import Queue
from string import Template
from typing import AsyncIterator, Iterator, Callable, TypeVar, Generic, Union, Optional, List, Tuple, Set, Dict, Any, \
    DefaultDict
from urllib.parse import quote as urlquote


# region Config
####


@dataclass(frozen=True)
class GitConfig:
    """Per-project git configuration."""
    remote: str
    shallow: bool = False


@dataclass(frozen=True)
class DockerConfig:
    """Per-project docker settings."""
    build: bool
    run: bool
    dockerfile: Path
    pull: bool
    cache: bool
    docker_in_docker: bool
    read_only: bool
    tmpfs: int  # percent
    tag: Optional[Template]
    label: Optional[Template]


@dataclass(frozen=True)
class RsyncConfig:
    """Per-project rsync settings."""
    destination: Optional[str]  # path or remote server if enabled
    delete: bool
    checksum: bool
    preserve: bool  # might need to run as root
    include: Optional[Path]


@dataclass(frozen=True)
class ProjectConfig:
    """Project configuration."""
    name: str
    enabled: bool
    remote: GitConfig
    branches: Optional[re.Pattern[str]]
    tags: Optional[re.Pattern[str]]
    env: Dict[str, str]
    docker: DockerConfig
    rsync: RsyncConfig
    timeout: Optional[float]
    prepare_callback: Optional[Path]
    success_callback: Optional[Path]
    error_callback: Optional[Path]

    def as_env(self) -> Dict[str, str]:
        return {**{"BUILD_PROJECT": self.name, "BUILD_REMOTE": self.remote.remote}, **self.env}


@dataclass(frozen=True)
class PipelineConfig:
    """Static configuration from commandline."""
    workdir: Path
    interval: int
    limit: int


class Config:
    """Parse ini file as list of project configurations."""

    class Parser(configparser.ConfigParser):
        T = TypeVar('T')
        V = TypeVar('V')

        def get_as(self, section: str, option: str, fallback: T, conv: Callable[[str], V]) -> Union[V, T]:
            val: Union[str, Config.Parser.T] = self.get(section, option, fallback=fallback)
            return conv(val) if isinstance(val, str) else val

    @classmethod
    def _parse_env(cls, assignments: str) -> Iterator[Tuple[str, str]]:
        for line in assignments.splitlines(keepends=False):
            if "=" in line and not line.startswith(("#", ";")):
                var, val = line.split("=", maxsplit=1)
                yield var.strip(), val.strip()

    @classmethod
    def _parse_env_file(cls, filename: Optional[Path]) -> Iterator[Tuple[str, str]]:
        if filename is not None:
            yield from cls._parse_env(filename.read_text(encoding="utf-8", errors="strict"))

    @classmethod
    def _parse(cls, parser: Parser) -> Iterator[ProjectConfig]:
        sections: Set[str] = set()
        for section in parser.sections():
            if re.fullmatch(r"[a-zA-Z0-9][a-zA-Z0-9_.-]*", section) is None:
                raise ValueError(f"Invalid project section name '{section}'")
            elif section in sections:
                raise ValueError(f"Duplicate project section '{section}'")
            else:
                sections.add(section)

            try:
                yield ProjectConfig(
                    name=section,
                    enabled=parser.getboolean(section, "enabled", fallback=True),
                    remote=GitConfig(
                        remote=parser.get(section, "remote"),
                        shallow=parser.getboolean(section, "shallow", fallback=True)
                    ),
                    branches=parser.get_as(section, "branches", fallback=None, conv=re.compile),
                    tags=parser.get_as(section, "tags", fallback=None, conv=re.compile),
                    env={**dict(cls._parse_env_file(parser.get_as(section, "env_file", fallback=None, conv=Path))),
                         **dict(cls._parse_env(parser.get(section, "env", fallback="")))},
                    docker=DockerConfig(
                        build=parser.getboolean(section, "build", fallback=False),
                        run=parser.getboolean(section, "run",
                                              fallback=parser.getboolean(section, "build", fallback=False)),
                        dockerfile=parser.get_as(section, "dockerfile", fallback=Path("Dockerfile"), conv=Path),
                        pull=parser.getboolean(section, "pull", fallback=True),
                        cache=parser.getboolean(section, "cache", fallback=True),
                        docker_in_docker=parser.getboolean(section, "docker_in_docker", fallback=False),
                        read_only=parser.getboolean(section, "read_only", fallback=True),
                        tmpfs=max(0, min(50, parser.getint(section, "tmpfs", fallback=50))),
                        tag=parser.get_as(section, "tag", fallback=None, conv=Template),
                        label=parser.get_as(section, "label", fallback=None, conv=Template),
                    ),
                    rsync=RsyncConfig(
                        destination=parser.get(section, "rsync.destination", fallback=None),
                        delete=parser.getboolean(section, "rsync.delete", fallback=False),
                        checksum=parser.getboolean(section, "rsync.checksum", fallback=False),
                        preserve=parser.getboolean(section, "rsync.preserve", fallback=False),
                        include=parser.get_as(section, "rsync.include", fallback=None, conv=Path),
                    ),
                    timeout=parser.getfloat(section, "timeout", fallback=None),
                    prepare_callback=parser.get_as(section, "prepare_callback", fallback=None, conv=Path),
                    success_callback=parser.get_as(section, "success_callback", fallback=None, conv=Path),
                    error_callback=parser.get_as(section, "error_callback", fallback=None, conv=Path),
                )
            except (ValueError, re.error, configparser.Error, UnicodeError, OSError) as e:
                raise ValueError(f"Parsing error in '{section}': {str(e)}") from None

    @classmethod
    def from_ini(cls, filename: Path) -> Iterator[ProjectConfig]:
        try:
            parser: Config.Parser = Config.Parser()
            with filename.open("r") as fp:
                parser.read_file(fp)
        except (OSError, configparser.Error, UnicodeError, ValueError) as e:
            raise RuntimeError(f"Cannot read config '{filename}': {str(e)}") from None
        try:
            yield from cls._parse(parser)
        except ValueError as e:
            raise RuntimeError(f"Cannot parse config '{filename}': {str(e)}") from None


# endregion
# region Logging
####


class QueuedLogger:
    """Move logging i/o into a separate thread."""

    def __init__(self, logger: logging.Logger, handlers: List[logging.Handler]) -> None:
        self._logger = logger
        self._queue: Queue[Optional[logging.LogRecord]] = Queue()
        self._handler: logging.Handler = QueueHandler(self._queue)
        self._listener: QueueListener = QueueListener(self._queue, *handlers)
        self._waiter: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=1, thread_name_prefix=self.__class__.__name__)

    async def flush(self) -> None:
        await asyncio.wrap_future(self._waiter.submit(self._queue.join))

    async def __aenter__(self) -> logging.Logger:
        self._listener.start()
        self._logger.addHandler(self._handler)
        return self._logger

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        self._logger.removeHandler(self._handler)
        self._handler.close()

        await asyncio.wrap_future(self._waiter.submit(self._listener.stop))
        for h in self._listener.handlers:
            await asyncio.wrap_future(self._waiter.submit(h.close))
        self._listener.handlers = tuple()

        self._waiter.shutdown(wait=True)


class RootLogger(QueuedLogger):
    """Move queue between root logger and its handlers."""

    def __init__(self) -> None:
        logger: logging.Logger = logging.getLogger()
        handlers, logger.handlers = logger.handlers, []
        super().__init__(logger, handlers)


class BuildLogger(QueuedLogger):
    """Standalone queued logger to file."""

    def __init__(self, name: str, filename: Path) -> None:
        try:
            handler: logging.StreamHandler = logging.FileHandler(filename.as_posix(), delay=False, encoding="utf-8")
            handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
        except OSError as e:
            raise OSError(f"Cannot create build logger for '{filename}': {str(e)}") from None

        logger: logging.Logger = logging.Logger(f"build.{name}", level=logging.DEBUG)
        logger.propagate = False
        super().__init__(logger, [handler])


# endregion
# region Context
####


@dataclass(frozen=True)
class GitRef:
    """Project remote specific version to build."""

    ref: str
    commit: str

    def __post_init__(self) -> None:
        if not self.ref.startswith(("refs/heads/", "refs/tags/")):
            raise ValueError(self.ref)
        if not self.commit.isalnum() or len(self.commit) != 40:
            raise ValueError(self.commit)

    @property
    def short_commit(self) -> str:
        return self.commit[:8]  # NB: 7 is also common

    @property
    def branch(self) -> Optional[str]:
        return self.ref[11:] if self.ref.startswith("refs/heads/") else None

    @property
    def tag(self) -> Optional[str]:
        return self.ref[10:] if self.ref.startswith("refs/tags/") else None

    @property
    def branch_or_tag(self) -> str:
        return self.branch or self.tag or self.ref

    def as_dict(self) -> Dict[str, str]:
        return {"ref": self.ref, "commit": self.commit,
                "short_commit": self.short_commit, "branch": self.branch or "", "tag": self.tag or ""}

    def as_env(self) -> Dict[str, str]:
        return {f"BUILD_{k.upper()}": v for k, v in self.as_dict().items()}

    def __str__(self) -> str:
        return f"<{self.ref}:{self.short_commit}>"


class BuildContext:
    """Runnable instance of project, git ref, and path configuration."""

    def __init__(self, project: ProjectConfig, ref: GitRef, path: Path, target_path: Path) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__).getChild(project.name)
        self._project: ProjectConfig = project
        self._ref: GitRef = ref
        self._path: Path = path
        self._target_path: Path = target_path

    @property
    def project(self) -> ProjectConfig:
        return self._project

    @property
    def ref(self) -> GitRef:
        return self._ref

    @classmethod
    def quote_path(cls, p: str) -> str:
        return urlquote(p, safe="")

    @classmethod
    def _ensure_dir(cls, p: Path) -> Path:
        try:
            p.mkdir(parents=False, exist_ok=True)
        except OSError as e:
            raise OSError(f"Cannot create '{p}': {str(e)}") from None
        else:
            return p

    @property
    def root_dir(self) -> Path:
        return self._path

    @property
    def build_dir(self) -> Path:
        return self._ensure_dir(self._path / "build")

    @property
    def drop_dir(self) -> Path:
        return self._ensure_dir(self._path / "drop")

    @property
    def logger(self) -> logging.Logger:
        return self._logger

    def get_build_logger(self) -> BuildLogger:
        """Open build logfile. Raises OSError."""
        return BuildLogger(self._project.name, self._path / "build.log")

    def finalize(self) -> None:
        """Mark build as successfully completed (by renaming to without the .tmp suffix)."""
        try:
            self._path.rename(self._target_path)
        except OSError as e:
            raise OSError(f"Cannot rename build directory to {self._target_path}: {str(e)}") from None
        else:
            self._logger.info(f"{self._path} -> {self._target_path}")

    def __str__(self) -> str:
        return f"<{self._path.as_posix()}>"


class ProjectContext:
    """Project that is watched for git updates, possibly creating a build context to run."""

    def __init__(self, project: ProjectConfig, root: Path) -> None:
        self._path: Path = root / BuildContext.quote_path(project.name)
        self._project: ProjectConfig = project
        self._ref_cache: DefaultDict[str, Dict[str, bool]] = defaultdict(dict)

    @property
    def project(self) -> ProjectConfig:
        return self._project

    def _get_build_path(self, ref: GitRef) -> Path:
        return self._path / BuildContext.quote_path(ref.ref) / BuildContext.quote_path(ref.short_commit + ".tmp")

    def _get_target_path(self, ref: GitRef) -> Path:
        return self._path / BuildContext.quote_path(ref.ref) / BuildContext.quote_path(ref.short_commit)

    def build_exists(self, ref: GitRef) -> bool:
        try:
            if ref.commit not in self._ref_cache[ref.ref]:
                self._ref_cache[ref.ref][ref.commit] = \
                    self._get_build_path(ref).is_dir() or self._get_target_path(ref).is_dir()  # NB: check tmp first
            return self._ref_cache[ref.ref][ref.commit]
        except OSError as e:
            raise OSError(f"Cannot check {str(self)} for {str(ref)}: {str(e)}") from None

    def build_acquire(self, ref: GitRef) -> BuildContext:
        try:
            path: Path = self._get_build_path(ref)
            path.mkdir(parents=True, exist_ok=False)  # acts as lock, but called in sync anyway
            self._ref_cache[ref.ref][ref.commit] = True
        except OSError as e:
            raise OSError(f"Cannot create {str(ref)} in {str(self)}: {str(e)}") from None
        else:
            return BuildContext(self._project, ref, path, self._get_target_path(ref))

    def __str__(self) -> str:
        return f"<{self._path.as_posix()}>"


# endregion
# region Commands
####


class CommandEnv(UserDict[str, str]):
    pass


class OsCommandEnv(CommandEnv):
    """Pass along current environment variables, possibly overridden."""
    def __init__(self, extra: CommandEnv = CommandEnv()) -> None:
        env: Dict[str, str] = os.environ.copy()
        env.update(extra.data)
        super().__init__(env)


class BuildCommandEnv(CommandEnv):
    """Configured and build-specific environment, used also as explicit docker arguments/environment."""
    def __init__(self, project: ProjectConfig, ref: GitRef, build_dir: Path) -> None:
        env: Dict[str, str] = {"BUILD_DIR": build_dir.as_posix()}
        env.update(ref.as_env())
        env.update(project.as_env())
        super().__init__(env)


class CommandProxy:
    """Call a binary and process stdout/stderr with build logger and async callbacks."""

    T = TypeVar('T')

    class OutputCallback(Generic[T]):
        """Registered to receive raw or parsed output lines. The default implementation discards."""

        async def __call__(self, line: 'CommandProxy.T') -> None:
            pass

    class LoggerCallback(OutputCallback[str]):
        """Pass each output line to the build logger before passing to a chained parent."""

        def __init__(self, level: int, logger: logging.Logger, parent: 'CommandProxy.OutputCallback[str]') -> None:
            self._level: int = level
            self._logger: logging.Logger = logger
            self._parent: CommandProxy.OutputCallback[str] = parent
            self._prefix: str = "! " if level >= logging.WARNING else "> "

        async def __call__(self, line: str) -> None:
            self._logger.log(self._level, self._prefix + line)
            await self._parent(line)

    class CommandProcess:
        """Subprocess context manager to ensure termination upon exit or exception."""

        def __init__(self, logger: logging.Logger, args: List[str], env: CommandEnv, kill_timeout: float = 5.0) -> None:
            self._logger: logging.Logger = logger
            self._name: str = args[0]
            self._args: List[str] = args[1:]
            self._env: CommandEnv = env
            self._kill_timeout: float = kill_timeout
            self._process: Optional[asyncio.subprocess.Process] = None

        async def _cancel(self) -> None:
            if self._process is None or self._process.returncode is not None:
                return

            try:
                self._logger.warning(f"Terminating {self._name}")
                self._process.terminate()
                await asyncio.wait_for(self._process.wait(), self._kill_timeout)
            except (asyncio.TimeoutError, asyncio.CancelledError):
                pass
            except ProcessLookupError:
                return
            else:
                return

            try:
                self._logger.error(f"Killing {self._name}")
                self._process.kill()
            except ProcessLookupError:
                pass

        async def __aenter__(self) -> asyncio.subprocess.Process:
            """Execute without shell, in a new process group, and with stdout/stderr pipes."""
            try:
                self._process = await asyncio.create_subprocess_exec(
                    self._name, *self._args, env=self._env,
                    close_fds=True, start_new_session=True,
                    stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL,
                )
            except OSError as e:
                raise RuntimeError(f"Cannot run {self._name}: {str(e)}") from None
            else:
                return self._process

        async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
            if exc_type:
                self._logger.error(f"Cancelling {self._name} due to {exc_type.__name__}")
                await self._cancel()
            if self._process is not None and await self._process.wait() != 0:
                raise RuntimeError(f"{self._name} exited with {self._process.returncode}") from exc_type

    def __init__(self, logger: logging.Logger, build_logger: logging.Logger,
                 env: CommandEnv, timeout: Optional[float] = None) -> None:
        self._logger: logging.Logger = logger
        self._build_logger: logging.Logger = build_logger
        self._env: CommandEnv = env
        self._timeout: Optional[float] = timeout

    async def _readline(self, stream: Optional[asyncio.StreamReader]) -> AsyncIterator[str]:
        while stream is not None:
            try:
                line: bytes = await stream.readline()
            except ValueError as e:
                self._logger.error(f"Cannot read line: {str(e)}")
                continue
            except OSError as e:
                self._logger.error(f"Cannot read: {str(e)}")
                break
            if not len(line):
                break
            else:
                line = line.rstrip()
            if len(line):
                yield line.decode("utf-8", errors="replace")

    async def _log_and_tee(self, stream: Optional[asyncio.StreamReader], cb: OutputCallback[str]) -> None:
        async for line in self._readline(stream):
            await cb(line)

    async def exec(self, args: List[str],
                   cb_out: OutputCallback[str] = OutputCallback(),
                   cb_err: OutputCallback[str] = OutputCallback()) -> None:
        """Run command with timeout, processing output callbacks in the background."""

        self._build_logger.info(f"$ {' '.join(args)}")
        cb_out = CommandProxy.LoggerCallback(logging.DEBUG, self._build_logger, parent=cb_out)
        cb_err = CommandProxy.LoggerCallback(logging.WARNING, self._build_logger, parent=cb_err)

        readers: List[asyncio.Task[None]] = []
        try:
            async with self.CommandProcess(self._logger, args, self._env) as proc:
                readers = [asyncio.create_task(self._log_and_tee(proc.stdout, cb_out)),
                           asyncio.create_task(self._log_and_tee(proc.stderr, cb_err))]
                await asyncio.wait_for(proc.wait(), self._timeout)
        finally:
            await asyncio.gather(*readers, return_exceptions=True)


class Git(CommandProxy):
    """Git binary commands: ls-remote, clone, and checkout."""

    class LsRemoteParser(CommandProxy.OutputCallback[str]):
        """Parse the output of ls-remote into ref (branch/tag) and commit hash."""

        def __init__(self, logger: logging.Logger, parent: CommandProxy.OutputCallback[GitRef]) -> None:
            self._logger: logging.Logger = logger
            self._parent: CommandProxy.OutputCallback[GitRef] = parent

        async def __call__(self, line: str) -> None:
            try:
                commit, ref = line.split(maxsplit=1)
                git_ref: GitRef = GitRef(ref, commit)
            except ValueError as e:
                self._logger.warning(f"Cannot parse git ref, got '{line}': {str(e)}")
            else:
                await self._parent(git_ref)

    async def ls_remote(self, remote: GitConfig, cb: CommandProxy.OutputCallback[GitRef]) -> None:
        await self.exec([
            "git", "--no-pager", "ls-remote", "--heads", "--tags", "--refs", "--quiet", "--", remote.remote
        ], cb_out=Git.LsRemoteParser(self._logger, cb))

    async def clone(self, remote: GitConfig, ref: GitRef, path: Path) -> None:
        shallow_args: List[str] = [
            "--depth", "1", "--shallow-submodules", "--no-tags", "--single-branch", "--branch", ref.branch_or_tag
        ] if remote.shallow else []
        await self.exec(
            ["git", "--no-pager", "-C", path.parent.as_posix()] +
            ["clone", "--no-local", "--no-hardlinks", "--recurse-submodules", "--verbose"] +
            shallow_args +
            ["--", remote.remote, path.name]
        )

    async def checkout(self, remote: GitConfig, ref: GitRef, path: Path) -> None:
        await self.exec(
            ["git", "--no-pager", "-C", path.as_posix(), "-c", "advice.detachedHead=false"] +
            ["checkout", "--recurse-submodules", "--force", "--no-guess", "--detach", ref.commit]
        )

    async def clone_ref(self, remote: GitConfig, ref: GitRef, path: Path) -> None:
        # fetching an arbitrary commit might not be supported or allowed, so do a clone with checkout instead
        # note that the checkout will fail on a shallow clone if there is a new head in the meanwhile
        await self.clone(remote, ref, path)
        await self.checkout(remote, ref, path)


class Docker(CommandProxy):
    """Docker binary commands: build and run."""

    class DockerBuildParser(CommandProxy.OutputCallback[str]):
        """As neither --quiet nor tagging is forced, need to parse the build output for image id."""

        def __init__(self) -> None:
            self._patterns: List[re.Pattern[str]] = [
                re.compile(r"writing image sha256:(?P<image>[0-9a-f]+)"),
                re.compile(r"Successfully built (?P<image>[0-9a-f]+)"),
            ]
            self._last_match: Optional[str] = None

        async def __call__(self, line: str) -> None:
            for pattern in self._patterns:
                m: Optional[re.Match[str]] = pattern.search(line)
                if m is not None:
                    self._last_match = m.group("image")

        def get(self) -> str:
            if self._last_match is None:
                raise ValueError("Cannot parse build log for image")
            return self._last_match

    def _build_command(self, config: DockerConfig, expose_env: List[str]) -> Iterator[str]:
        """Get the config-dependent first part of the docker build command. Raises ValueError."""
        yield from ["docker", "build", "--rm", "--force-rm"]  # always remove intermediate containers
        if config.pull:  # always attempt to pull a newer version of the image
            yield "--pull"
        if not config.cache:  # do not use cache when building the image
            yield "--no-cache"
        if config.tag is not None:  # name and optionally a tag in name:tag format
            yield from ["--tag", config.tag.substitute(self._env)]
        if config.label is not None:  # set metadata for an image in name=value format
            yield from ["--label", config.label.substitute(self._env)]
        for var_name in expose_env:  # set build-time variables
            yield from ["--build-arg", var_name]

    def _run_command(self, config: DockerConfig, expose_env: List[str]) -> Iterator[str]:
        """Get the config-dependent first part of the docker run command. Raises ValueError."""
        yield from ["docker", "run", "--rm", "--pull", "never"]
        if config.read_only:  # mount the container's root filesystem as read only
            yield "--read-only"
        if config.docker_in_docker:  # XXX: assumes standard unix socket
            yield from ["-v", "/var/run/docker.sock:/var/run/docker.sock"]
        if config.label is not None:  # set metadata on a container in name=value format
            yield from ["--label", config.label.substitute(self._env)]
        if config.tmpfs > 0:  # mount a tmpfs directory
            yield from ["--tmpfs", f"/tmp:size={config.tmpfs}%,mode=1777,rw,relatime"]  # nosec
        for var_name in expose_env:  # set environment variables
            yield from ["--env", var_name]

    async def build(self, config: DockerConfig, path: Path, env: List[str]) -> str:
        cb: Docker.DockerBuildParser = Docker.DockerBuildParser()
        await self.exec(list(self._build_command(config, env)) +
                        ["--file", (path / config.dockerfile).as_posix(), "--", path.as_posix()],
                        cb_err=cb)
        return cb.get()

    async def run(self, config: DockerConfig, image: str, name: str, volume: Path, drop: Path, env: List[str]) -> None:
        await self.exec(list(self._run_command(config, env)) +
                        ["-v", f"{volume.as_posix()}:/build:rw", "-v", f"{drop.as_posix()}:/drop:rw"] +
                        ["--name", name, "--", image])


class Rsync(CommandProxy):
    @classmethod
    def _sync_command(cls, config: RsyncConfig, checkout: Path) -> Iterator[str]:
        yield from ["rsync", "--verbose", "--recursive", "--links"]
        if config.checksum:
            yield from ["--checksum"]
        if config.preserve:
            yield from ["--owner", "--group", "--perms", "--times"]
        if config.delete:
            yield from ["--delete-after", "--delete-excluded"]
        if config.include is not None:
            yield from ["--include-from", (checkout / config.include).as_posix()]
        else:
            yield from ["--exclude", ".git"]

    async def sync(self, config: RsyncConfig, checkout: Path, source: Path) -> None:
        if config.destination is not None:
            await self.exec(list(self._sync_command(config, checkout)) + [
                "--", source.as_posix() + "/", config.destination
            ])


# endregion
# region Runner
####


class Runner:
    """Perform the actual git and docker operations of a build task."""

    @dataclass(frozen=True)
    class RunId:
        """Unique identifier used for exclusively locking both branch/tag and commit per project."""
        project: str
        ref: str
        commit: str

    def __init__(self, build: BuildContext) -> None:
        self._build: BuildContext = build
        self._logger: logging.Logger = self._build.logger
        self._logger.info(f"Enqueueing {self._build}")

    @property
    def run_id(self) -> RunId:
        return Runner.RunId(project=self._build.project.name,
                            ref=self._build.ref.branch_or_tag,
                            commit=self._build.ref.short_commit)

    async def run(self) -> None:
        """Main callback doing the actual work."""
        self._logger.info(f"Running for {self._build}")
        try:
            build_logger: QueuedLogger = self._build.get_build_logger()
            async with build_logger as logger:
                env: CommandEnv = BuildCommandEnv(self._build.project, self._build.ref, self._build.root_dir)
                command_env: CommandEnv = OsCommandEnv(env)
                env_keys: List[str] = list(env.keys())

                git: Git = Git(self._logger, logger, command_env, self._build.project.timeout)
                docker: Docker = Docker(self._logger, logger, command_env, self._build.project.timeout)
                rsync: Rsync = Rsync(self._logger, logger, command_env, self._build.project.timeout)
                callback: CommandProxy = CommandProxy(self._logger, logger, command_env, self._build.project.timeout)

                try:
                    await git.clone_ref(self._build.project.remote, self._build.ref, self._build.build_dir)

                    if self._build.project.prepare_callback is not None:
                        await callback.exec([self._build.project.prepare_callback.as_posix()])

                    if self._build.project.docker.build:
                        image: str = await docker.build(self._build.project.docker, self._build.build_dir, env_keys)
                        name: str = f"{self._build.project.name}-{self._build.ref.short_commit}"
                        if self._build.project.docker.run:
                            await docker.run(self._build.project.docker, image, name,
                                             self._build.build_dir, self._build.drop_dir, env_keys)
                            await rsync.sync(self._build.project.rsync, self._build.build_dir, self._build.drop_dir)
                        else:
                            await rsync.sync(self._build.project.rsync, self._build.build_dir, self._build.build_dir)
                    else:
                        await rsync.sync(self._build.project.rsync, self._build.build_dir, self._build.build_dir)

                    if self._build.project.success_callback is not None:
                        await build_logger.flush()
                        await callback.exec([self._build.project.success_callback.as_posix()])
                except Exception:
                    if self._build.project.error_callback is not None:
                        await build_logger.flush()
                        await callback.exec([self._build.project.error_callback.as_posix()])
                    raise
                else:
                    self._build.finalize()  # NB: build logger file stream still active
        except asyncio.CancelledError:
            self._logger.error("Run cancelled")
        except asyncio.TimeoutError:
            self._logger.error("Run timeout")
        except (RuntimeError, ValueError, OSError) as e:
            self._logger.error(f"Run failed with {e.__class__.__name__}: {str(e)}")
        else:
            self._logger.info("Success")


class RemoteWatcher:
    """Watch git remotes for new commits that match the project configuration."""

    WorkQueue = asyncio.Queue[Optional[Tuple[str, GitRef]]]

    class RefCallback(CommandProxy.OutputCallback[GitRef]):
        """Parse git output for new refs to be submitted to the given queue."""

        def __init__(self, logger: logging.Logger, project: ProjectConfig, queue: 'RemoteWatcher.WorkQueue') -> None:
            self._logger: logging.Logger = logger
            self._project: ProjectConfig = project
            self._queue: RemoteWatcher.WorkQueue = queue
            self._cache: DefaultDict[str, str] = defaultdict(str)

        def _match(self, ref: GitRef) -> bool:
            if ref.branch is not None:
                if self._project.branches is not None and self._project.branches.fullmatch(ref.branch) is not None:
                    return True
            if ref.tag is not None:
                if self._project.tags is not None and self._project.tags.fullmatch(ref.tag) is not None:
                    return True
            return False

        async def __call__(self, ref: GitRef) -> None:
            if self._cache[ref.ref] != ref.commit:
                self._cache[ref.ref] = ref.commit
                if self._match(ref):
                    self._logger.info(f"{self._project.name} now at {ref}")
                    self._queue.put_nowait((self._project.name, ref))

    def __init__(self, projects: List[ProjectContext], interval: int) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._git_logger: logging.Logger = self._logger.getChild("git")
        self._interval: int = max(1, interval)

        self._shutdown: asyncio.Event = asyncio.Event()
        self._queue: RemoteWatcher.WorkQueue = asyncio.Queue()
        self._task: Optional[asyncio.Task[None]] = None

        self._projects: Dict[str, ProjectContext] = {_.project.name: _ for _ in projects}
        self._callbacks: Dict[str, RemoteWatcher.RefCallback] = {
            _.project.name: RemoteWatcher.RefCallback(self._logger, _.project, self._queue) for _ in projects
        }

    async def subscribe(self) -> AsyncIterator[Tuple[ProjectContext, GitRef]]:
        """Block on new refs to be enqueued as build tasks, until explicitly cancelled."""
        while self._task is not None and not self._shutdown.is_set():
            name_ref: Optional[Tuple[str, GitRef]] = await self._queue.get()
            if name_ref is None:
                self._shutdown.set()
            else:
                yield self._projects[name_ref[0]], name_ref[1]

    def cancel(self) -> None:
        self._shutdown.set()
        self._queue.put_nowait(None)

    async def _wait(self, timeout: float) -> bool:
        try:
            return not await asyncio.wait_for(self._shutdown.wait(), timeout)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            return True

    async def _worker(self) -> None:
        git: Git = Git(self._logger, self._git_logger, OsCommandEnv())
        interval: float = self._interval / len(self._projects)
        while True:
            for name, project in self._projects.items():
                if not await self._wait(interval):
                    self._queue.put_nowait(None)
                    return
                try:
                    await git.ls_remote(project.project.remote, self._callbacks[name])
                except Exception as e:
                    self._logger.error(str(e))  # but go on

    async def __aenter__(self) -> 'RemoteWatcher':
        if self._task is None and len(self._projects):
            self._logger.info(f"Watching {len(self._projects)} remotes")
            self._task = asyncio.create_task(self._worker())
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        self.cancel()
        if self._task is not None:
            try:
                await self._task
            except asyncio.CancelledError:
                pass
            self._task = None
            self._logger.info("Stopped remote watchdog")


# endregion
# region Tasks
####


class LimitProvider:
    """Ensure the globally configured max parallelism on build tasks. Also maintain per-project ref semaphores."""

    LockId = Tuple[str, ...]

    def __init__(self, global_limit: int) -> None:
        self._global_limit: asyncio.Semaphore = asyncio.BoundedSemaphore(max(1, global_limit))
        self._ref_lock: DefaultDict[LimitProvider.LockId, asyncio.Semaphore] = defaultdict(asyncio.BoundedSemaphore)
        self._commit_lock: DefaultDict[LimitProvider.LockId, asyncio.Semaphore] = defaultdict(asyncio.BoundedSemaphore)

    def global_limit(self) -> asyncio.Semaphore:
        return self._global_limit

    def ref_limit(self, run: Runner.RunId) -> asyncio.Semaphore:
        return self._ref_lock[(run.project, run.ref)]

    def commit_limit(self, run: Runner.RunId) -> asyncio.Semaphore:
        return self._commit_lock[(run.project, run.commit)]


class TaskSet:
    """Enqueue, maintain, and run all pending build tasks. No dependencies, so can outlive reconfigurations."""

    def __init__(self, limits: LimitProvider) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._limits: LimitProvider = limits
        self._tasks: Set[asyncio.Task[None]] = set()

    async def _run(self, runner: Runner) -> None:
        try:
            async with self._limits.global_limit():
                async with self._limits.ref_limit(runner.run_id):
                    async with self._limits.commit_limit(runner.run_id):
                        await runner.run()
        except asyncio.CancelledError:
            self._logger.warning("Runner task cancelled")
        except BaseException as e:
            self._logger.error("Runner task failed", exc_info=e)

    async def __aenter__(self) -> 'TaskSet':
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        if exc_type:
            self.cancel()
        if len(self._tasks):
            self._logger.info(f"Waiting for {len(self._tasks)} remaining tasks")
        await asyncio.gather(*self._tasks, return_exceptions=True)  # ignoring exceptions, logged by task

    async def enqueue(self, runner: Runner) -> None:
        task: asyncio.Task[None] = asyncio.create_task(self._run(runner))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    def cancel(self) -> None:
        if len(self._tasks):
            self._logger.warning(f"Cancelling {len(self._tasks)} tasks")
        for task in self._tasks:
            task.cancel()


class SignalHandler:
    """Put received signals on a stack and call registered callbacks."""

    Callback = Callable[[], None]

    class Signal(IntEnum):
        """Signal values in increasing severity and (accidentally also) integer value."""
        none = 0
        reconfigure = signal.SIGHUP
        term_soft = signal.SIGINT
        term = signal.SIGQUIT
        term_hard = signal.SIGTERM

    class Watcher:
        """Context manager to (un-)register a callback when a signal of the given severity (or higher) occurs."""

        def __init__(self, signals: 'SignalHandler', sig: 'SignalHandler.Signal', cb: 'SignalHandler.Callback') -> None:
            self._signals: SignalHandler = signals
            self._min_signal: SignalHandler.Signal = sig
            self._callback: SignalHandler.Callback = cb

        def __enter__(self) -> None:
            self._signals._register(self._min_signal, self._callback)

        def __exit__(self, exc_type, exc_val, exc_tb) -> None:
            self._signals._unregister(self._min_signal, self._callback)

    def __init__(self) -> None:
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._signals: Set[SignalHandler.Signal] = set()
        self._callbacks: DefaultDict[SignalHandler.Signal, Set[SignalHandler.Callback]] = defaultdict(set)

        loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
        for s in SignalHandler.Signal:
            if s > SignalHandler.Signal.none:
                loop.add_signal_handler(s.value, self._handler, s)

    def _handler(self, sig: 'SignalHandler.Signal') -> None:
        self._logger.warning(f"Received {sig.name} signal {sig.value}")
        self._signals.add(sig)
        for cb in [_ for min_sig, cbs in self._callbacks.items() for _ in cbs if sig >= min_sig]:  # NB: copy list
            cb()

    def _register(self, min_sig: 'SignalHandler.Signal', cb: Callable[[], None]) -> None:
        self._callbacks[min_sig].add(cb)
        if self.max_signal() >= min_sig:
            cb()

    def _unregister(self, min_sig: 'SignalHandler.Signal', cb: Callable[[], None]) -> None:
        self._callbacks[min_sig].discard(cb)

    def max_signal(self) -> Signal:
        """The highest-severity recorded signal."""
        return max(self._signals) if len(self._signals) else SignalHandler.Signal.none

    def reset(self) -> None:
        """Start recording signals from scratch."""
        self._signals.clear()


# endregion
# region Main
####


def _setup_logging() -> QueuedLogger:
    """Hard-coded logging subsystem configuration to (queued) stderr."""
    logging.raiseExceptions = False
    logging.logThreads = False
    logging.logMultiprocessing = False
    logging.logProcesses = False
    logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(name)s: %(message)s")
    logging.getLogger(f"{RemoteWatcher.__name__}.git").setLevel(logging.WARNING)
    return RootLogger()


async def _run(logger: logging.Logger, interval: int,
               tasks: TaskSet, signals: SignalHandler, projects: List[ProjectContext]) -> bool:
    """Per-configuration-time main loop watching projects for new refs and enqueueing build tasks."""
    async with RemoteWatcher(projects, interval) as watcher:
        with SignalHandler.Watcher(signals, SignalHandler.Signal.reconfigure, watcher.cancel):
            async for project, ref in watcher.subscribe():
                try:
                    if not project.build_exists(ref):
                        await tasks.enqueue(Runner(project.build_acquire(ref)))
                except OSError as e:
                    logger.error(str(e))
                    return False
    return True


async def _main(config: PipelineConfig, project_config: Path) -> bool:
    async with _setup_logging() as logger:
        setlocale(LC_ALL, "C")

        try:
            config.workdir.mkdir(parents=True, exist_ok=True)  # NB: default permissions and umask
            workdir: Path = config.workdir.resolve(strict=True)
        except OSError as e:
            logger.error(f"Cannot create '{config.workdir}': {str(e)}")
            return False
        else:
            logger.info(f"Using state in '{workdir}'")

        signals: SignalHandler = SignalHandler()
        tasks: TaskSet = TaskSet(LimitProvider(config.limit))
        with SignalHandler.Watcher(signals, SignalHandler.Signal.term, tasks.cancel):
            async with tasks as jobs:
                while signals.max_signal() < SignalHandler.Signal.term_soft:  # (re-)configure and run
                    signals.reset()
                    try:
                        projects: List[ProjectContext] = [
                            ProjectContext(_, workdir) for _ in Config.from_ini(project_config) if _.enabled
                        ]
                    except RuntimeError as e:
                        logger.error(str(e))
                        return False  # waits for background jobs
                    else:
                        logger.info(f"Configured {len(projects)} projects from '{project_config}'")
                        gc.collect()
                    if not await _run(logger, config.interval, jobs, signals, projects):
                        return False
        logger.info("Done.")
        return True


def _dump_config(workdir: Path, config: Path) -> int:
    class ConfigEncoder(json.JSONEncoder):
        def default(self, o: Any) -> Any:
            if isinstance(o, re.Pattern):
                return o.pattern
            elif isinstance(o, Template):
                return o.template
            elif isinstance(o, Path):
                return o.as_posix()
            elif isinstance(o, CommandEnv):
                return dict(o)
            elif isinstance(o, ProjectConfig):
                return asdict(o)
            else:
                return super().default(o)

    print(ConfigEncoder(ensure_ascii=False, sort_keys=True, indent=2).encode({
        project.name: {
            "config": project,
            "env": BuildCommandEnv(project, GitRef(ref="refs/heads/master", commit="0" * 40), workdir / "tmp")
        } for project in Config.from_ini(config)
    }))
    return 0


def main() -> int:
    parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--limit", type=int, default=1, help="parallel build limit", metavar="NUM")
    parser.add_argument("--interval", type=int, default=60, help="git remote check interval", metavar="SECONDS")
    parser.add_argument("--workdir", type=Path, required=True, help="output and state directory", metavar="PATH")
    parser.add_argument("--config", type=Path, required=True, help="project configuration file", metavar="CONFIG.INI")
    parser.add_argument("--dump-config", action='store_const', const=True, default=False, help="dump config and exit")
    args = parser.parse_args()

    if args.dump_config:
        return _dump_config(args.workdir, args.config)

    return 0 if asyncio.run(_main(PipelineConfig(workdir=args.workdir,
                                                 limit=args.limit,
                                                 interval=args.interval), args.config)) else 1


if __name__ == "__main__":
    sys.exit(main())


####
# endregion